You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/08/27 23:48:54 UTC

[GitHub] [hudi] satishkotha opened a new pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

satishkotha opened a new pull request #2048:
URL: https://github.com/apache/hudi/pull/2048


   
   ## What is the purpose of the pull request
   I am following up on feedback apache#1859, we want to make replace a top level action. This is WIP, but publishing to give a high level idea on changes required.
   
   ## Brief change log
   
   There are multiple challenges in making replace top level action:
   
   1. In the write path, we create .commit in 2places - BaseActionCommitExecutor or HoodieSparkSqlWriter. We need to change both places create  .replace file. 
   2. All post commit actions work on top of HoodieCommitMetadata. For now replace is also using same class to keep the change simple. We can split HoodieCommitMetadata into class hierarchy (not sure how json serialization works with inheritance) or discuss other alternatives
   3. There are many assumptions in the code to say commit action type is tied to table type. For example, action type can only be either '.commit' or '.deltacommit'. [here](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java#L474). Changing this to add replace seemed error prone and tedious. we may have to add better abstractions here.
   4. The way we identify if a parquet file is valid is by checking if theres a corresponding '.commit' file. If we just create .replace file, we have to change lot of places to make sure new files created by replace are used.
   5. Everywhere, we try to invoke 'getCommitsTimeline'/'filterCommits', we need to review and make sure caller can handle replace actions. OR create new methods and refactor all invocations of getCommitsTimeline call new methods.
   
   I made most of the changes for #1,2,3 above. Need to discuss if this is the right approach and extend it to 4,5.
   
   ## Verify this pull request
   This change added tests. Verified basic actions using quick start and docker setup. Adding more tests in progress. But wanted to get feedback on high level approach.
   
   This is an example .hoodie folder from quick start setup:
   -rw-r--r--  1 satishkotha  wheel  1933 Aug 27 16:39 20200827163904.commit
   -rw-r--r--  1 satishkotha  wheel     0 Aug 27 16:39 20200827163904.commit.requested
   -rw-r--r--  1 satishkotha  wheel  1015 Aug 27 16:39 20200827163904.inflight
   -rw-r--r--  1 satishkotha  wheel  2610 Aug 27 16:39 20200827163927.replace
   -rw-r--r--  1 satishkotha  wheel  1024 Aug 27 16:39 20200827163927.replace.inflight
   -rw-r--r--  1 satishkotha  wheel     0 Aug 27 16:39 20200827163927.replace.requested
   drwxr-xr-x  2 satishkotha  wheel    64 Aug 27 16:39 archived
   -rw-r--r--  1 satishkotha  wheel   235 Aug 27 16:39 hoodie.properties
   
   
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r493002158



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##########
@@ -251,6 +262,28 @@ private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant)
     LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add newly found REPLACE instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param instant REPLACE Instant
+   */
+  private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {

Review comment:
       Discussed with @satishkotha. We will track commit action type along with instant since we have introduced one another type of "commit" -> replace. Restore can perform custom handling only for this action type. It is safer to let Incremental file-system view to revert replace mappings (in memory or rocksdb) by providing the replace instant time as it  isrobust in case of partial restore failures. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -738,7 +799,9 @@ private String formatPartitionKey(String partitionStr) {
    * @param commitsToReturn Commits
    */
   Stream<FileSlice> fetchLatestFileSliceInRange(List<String> commitsToReturn) {
-    return fetchAllStoredFileGroups().map(fileGroup -> fileGroup.getLatestFileSliceInRange(commitsToReturn))
+    return fetchAllStoredFileGroups()
+        .filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup, commitsToReturn))

Review comment:
       @satishkotha  : As discussed, All the replace filtering needs to move to getXXX() apis as the fetch APIs are only responsible for fetching file slices/base-files from different types of storage. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488283893



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -378,9 +398,9 @@ public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInst
         new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName());
     FileSystem fs = FSUtils.getFs(basePath, configuration);
     try (FSDataOutputStream os = fs.create(commitFile, true)) {
-      HoodieCompactionPlan workload = new HoodieCompactionPlan();
+      HoodieCompactionPlan workload = HoodieCompactionPlan.newBuilder().setVersion(1).build();

Review comment:
       Version is not being set by default, so reading test plan generated here is failing. So I explicitly set version.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r494727484



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       This is not calling function in the next line (calling method after that). So we only create meta client once. Please double check and let me know if i'm misinterpreting your suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488987937



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
+
+  private static final Logger LOG = LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+  public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+                                             HoodieWriteConfig config, HoodieTable table,
+                                             String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime, WriteOperationType.INSERT_OVERWRITE);
+    this.inputRecordsRDD = inputRecordsRDD;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
+        config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
+  }
+
+  @Override
+  protected Partitioner getPartitioner(WorkloadProfile profile) {
+    return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>> inputRecordsRDD, WorkloadProfile profile) {
+    // get all existing fileIds to mark them as replaced
+    JavaRDD<WriteStatus> replaceStatuses = getAllReplaceWriteStatus(profile);

Review comment:
       I refactored it and removed boolean from WriteStatus. PTAL




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar edited a comment on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar edited a comment on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-699784611


   @satishkotha : when the conflicts are resolved and comments addressed let me know and I will take a final pass to land.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488268777



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java
##########
@@ -22,5 +22,5 @@
  * The supported action types.
  */
 public enum ActionType {
-  commit, savepoint, compaction, clean, rollback
+  commit, savepoint, compaction, clean, rollback, replacecommit

Review comment:
       Filed HUDI-1281 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488208180



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {
+      fg.getAllRawFileSlices().forEach(fileSlice -> {
+        fileSlice.getBaseFile().map(baseFile -> deletePath(baseFile.getFileStatus().getPath(), instant));
+        fileSlice.getLogFiles().forEach(logFile -> deletePath(logFile.getPath(), instant));
+      });
+    });
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, TableFileSystemView fileSystemView) {
+    Option<HoodieInstant> replaceInstantOption = metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+        .filter(replaceInstant -> replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+    replaceInstantOption.ifPresent(replaceInstant -> {
+      try {
+        HoodieReplaceCommitMetadata metadata = HoodieReplaceCommitMetadata.fromBytes(
+            metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+            HoodieReplaceCommitMetadata.class);
+
+        metadata.getPartitionToReplaceStats().keySet().forEach(partition -> fileSystemView.getAllFileGroups(partition));
+      } catch (IOException e) {
+        throw new HoodieCommitException("Failed to archive because cannot delete replace files", e);
+      }
+    });
+  }
+
+  private boolean deletePath(Path path, HoodieInstant instant) {
+    try {
+      LOG.info("Deleting " + path + " before archiving " + instant);
+      metaClient.getFs().delete(path);

Review comment:
       metaclient.getFs() seems to be following singleton pattern, so it doesnt seem expensive to get this. am i reading it incorrectly?
   
   I can work to setup parallel deletes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488208571



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {
+      fg.getAllRawFileSlices().forEach(fileSlice -> {
+        fileSlice.getBaseFile().map(baseFile -> deletePath(baseFile.getFileStatus().getPath(), instant));
+        fileSlice.getLogFiles().forEach(logFile -> deletePath(logFile.getPath(), instant));
+      });
+    });
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, TableFileSystemView fileSystemView) {
+    Option<HoodieInstant> replaceInstantOption = metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+        .filter(replaceInstant -> replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+    replaceInstantOption.ifPresent(replaceInstant -> {

Review comment:
       correct, i'll restructure this code

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {

Review comment:
       correct. will add it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r494731329



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       My bad. Got confused with the method naming :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r489225350



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteResult.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Result of a write operation.
+ */
+public class HoodieWriteResult implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final long RANDOM_SEED = 9038412832L;

Review comment:
       Is this needed ?

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -370,4 +430,17 @@ private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline, HoodieI
     avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, "");
     return avroMetaData;
   }
+
+  public static org.apache.hudi.avro.model.HoodieReplaceCommitMetadata convertReplaceCommitMetadata(

Review comment:
       Move this ReplaceHelper class ?

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {
+      fg.getAllRawFileSlices().forEach(fileSlice -> {
+        fileSlice.getBaseFile().map(baseFile -> deletePath(baseFile.getFileStatus().getPath(), instant));
+        fileSlice.getLogFiles().forEach(logFile -> deletePath(logFile.getPath(), instant));
+      });
+    });
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, TableFileSystemView fileSystemView) {
+    Option<HoodieInstant> replaceInstantOption = metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+        .filter(replaceInstant -> replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+    replaceInstantOption.ifPresent(replaceInstant -> {
+      try {
+        HoodieReplaceCommitMetadata metadata = HoodieReplaceCommitMetadata.fromBytes(
+            metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+            HoodieReplaceCommitMetadata.class);
+
+        metadata.getPartitionToReplaceStats().keySet().forEach(partition -> fileSystemView.getAllFileGroups(partition));
+      } catch (IOException e) {
+        throw new HoodieCommitException("Failed to archive because cannot delete replace files", e);
+      }
+    });
+  }
+
+  private boolean deletePath(Path path, HoodieInstant instant) {
+    try {
+      LOG.info("Deleting " + path + " before archiving " + instant);
+      metaClient.getFs().delete(path);

Review comment:
       This could become a performance issue when we are deleting lot of replaced files. HoodieTimelineArchiveLog.archive() method is taking JavaSparkContext. right ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##########
@@ -251,6 +262,28 @@ private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant)
     LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add newly found REPLACE instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param instant REPLACE Instant
+   */
+  private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {

Review comment:
       @satishkotha : We need to handle case when we restore a .replace instant for incremental timeline file system view. 
   
   About the implementation -
   In addRestoreInstant(), we need to look at HoodieRestoreMetadata.instantsToRollback and for each instants which are .replace types, we need to remove the replace file-group mapping kept in the file-system view.  We would need a reverse mapping of instant to file-group-id and also a way to identify which of the entries in HoodieRestoreMetadata.instantsToRollback is replace metadata. Currently, we only store commit timestamps in HoodieRestoreMetadata.instantsToRollback. 
   I think it would be useful if we add an additional field in HoodieRestoreCommitMetadata and HoodieRollbackCommitMetadata to store both the timestamp and commit-action-type and use it here.
   
   Since, we only read committed replace actions, rollback is fine though.

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {

Review comment:
       +1

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * All the metadata that gets stored along with a commit.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
+  private static final Logger LOG = LogManager.getLogger(HoodieReplaceCommitMetadata.class);
+  protected Map<String, List<String>> partitionToReplaceFileIds;
+
+  // for ser/deser
+  public HoodieReplaceCommitMetadata() {
+    this(false);
+  }
+
+  public HoodieReplaceCommitMetadata(boolean compacted) {
+    super(compacted);
+    partitionToReplaceFileIds = new HashMap<>();
+  }
+
+  public void setPartitionToReplaceFileIds(Map<String, List<String>> partitionToReplaceFileIds) {
+    this.partitionToReplaceFileIds = partitionToReplaceFileIds;
+  }
+
+  public void addReplaceFileId(String partitionPath, String fileId) {
+    if (!partitionToReplaceFileIds.containsKey(partitionPath)) {
+      partitionToReplaceFileIds.put(partitionPath, new ArrayList<>());
+    }
+    partitionToReplaceFileIds.get(partitionPath).add(fileId);
+  }
+
+  public List<String> getReplaceFileIds(String partitionPath) {
+    return partitionToReplaceFileIds.get(partitionPath);
+  }
+
+  public Map<String, List<String>> getPartitionToReplaceFileIds() {
+    return partitionToReplaceFileIds;
+  }
+
+  public String toJsonString() throws IOException {

Review comment:
       nit: Can you add @Override annotation ?
   

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##########
@@ -251,6 +262,28 @@ private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant)
     LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add newly found REPLACE instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param instant REPLACE Instant
+   */
+  private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {

Review comment:
       Also, can you also add test cases for incremental file-system view for both addReplaceInstant and removeReplaceInstant in TestIncrementalFSViewSync ?

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       Only commit instants older than oldest pending compaction is allowed to be archived. But, if we encode the entire file-group in the replace metadata, we will have race conditions with pending compactions. So, I guess it is safer to figure out the file-group during the time of archiving when it is guaranteed pending compaction is done.
   
   Regarding the requirement for ensureReplacedPartitionsLoadedCorrectly, If you look at pending compaction handling in filesystem-view, pending compactions are eagerly loaded whenever we construct the filesystem view. This seems to be the case also for replace metadata. Then, why do we need to trigger loading from outside ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-685982272


   @bvaradar I addressed your comments.I also created followup items here https://issues.apache.org/jira/browse/HUDI-868 as you suggested. 
   
    Please take another look when possible. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r493002158



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##########
@@ -251,6 +262,28 @@ private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant)
     LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add newly found REPLACE instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param instant REPLACE Instant
+   */
+  private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {

Review comment:
       Discussed with @satishkotha. We will track commit action type along with instant since we have introduced one another type of "commit" -> replace. Restore can perform custom handling only for this action type. It is safer to let Incremental file-system view to revert replace mappings (in memory or rocksdb) by providing the replace instant time as it  isrobust in case of partial restore failures. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-687391466


   @vinothchandar @bvaradar  Addressed most of your suggestions. Couple other followup items I need help from you on:
   
   1) You suggested to remove HoodieReplaceStat.  I ran into minor implementation issue removing it. Basically, HoodieWriteClient operations return JavaRDD<WriteStatus>.  SparkSqlWriter uses these WriteStatus to create metadata (.commit/.replace etc).  Each WriteStatus comes with HoodieWriteStat (which is expected to be non-null in many places). This HoodieWriteStat is used for many post commit operations. So if we want to remove HoodieReplaceStat,  we can either
   a) change signature of WriteClient operations to return a new structured object instead of just returning JavaRDD<WriteStatus>. This object would contain  JavaRDD<WriteStatus> for newly created files and List<HoodieFileGroupId> for tracking file groups replaced. We have to change post commit operations to look at this new object instead of WriteStatus.
   OR
   b) Return a WriteStatus for replaced file groups too. WriteClient operations can continue to return JavaRDD<WriteStatus>. Each WriteStatus has HoodieWriteStat which can be a token value (null?) for replaced file groups. 
   
   Either way, this is somewhat involved change, so would like to get your feedback before starting implementation. What do you guys think?
   
   2) Deleting replaced file groups during archival vs clean. I've this deletion logic implemented in archival per our earlier conversation. But, as I mentioned, this may lead to storage inefficiency. For example, a) clean retain is set to 1 commit.  b) archival is done after 24 commits. We keep all the data for replaced files until archival happens. 
   
   Let me know if you guys have any other comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-683269153


   @satishkotha : How  is incremental Reading handled.Are we going to support it ? If not, Are you going to throw exceptions ?  As we are cleaning up replaced files during archiving only but perform normal cleanup during cleaner operations, there will be cases when file versions of new files getting cleaned up before the old file versions getting removed.  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490461331



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {

Review comment:
       Implemented parallel deletion. PTAL.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r481364995



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
##########
@@ -104,4 +104,8 @@ protected SyncableFileSystemView getFileSystemViewWithUnCommittedSlices(HoodieTa
   protected HoodieTableType getTableType() {
     return HoodieTableType.COPY_ON_WRITE;
   }
+
+  protected boolean areTimeTravelTestsEnabled() {

Review comment:
       RocksDB and Remote FileSystemViews are not implemented yet. So i temporarily disabled those tests. Moved this method to view tests instead of common

##########
File path: hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
##########
@@ -102,7 +102,7 @@ public void commit(WriterCommitMessage[] messages) {
             .flatMap(m -> m.getWriteStatuses().stream().map(m2 -> m2.getStat())).collect(Collectors.toList());
 
     try {
-      writeClient.commitStats(instantTime, writeStatList, Option.empty());
+      writeClient.commitStats(instantTime, writeStatList, Option.empty(), HoodieTimeline.COMMIT_ACTION); //TODO get action type from HoodieWriterCommitMessage

Review comment:
       Fixed

##########
File path: hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
##########
@@ -89,6 +89,10 @@ public FileSliceHandler(Configuration conf, FileSystemViewManager viewManager) t
         .collect(Collectors.toList());
   }
 
+  public List<String> getExcludeFileGroups(String basePath, String partitionPath) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r494731768



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, metaClient.getCommitActionType(), metaClient);
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) with specified action.
+   */
+  public void startCommitWithTime(String instantTime, String actionType) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, actionType, metaClient);
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) with specified action.
+   */
+  private void startCommitWithTime(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
     // NOTE : Need to ensure that rollback is done before a new commit is started
     if (rollbackPending) {
       // Only rollback inflight commit/delta-commits. Do not touch compaction commits
       rollbackPendingCommits();
     }
-    startCommit(instantTime);
+    startCommit(instantTime, actionType, metaClient);
   }
 
-  private void startCommit(String instantTime) {
-    LOG.info("Generate a new instant time " + instantTime);
-    HoodieTableMetaClient metaClient = createMetaClient(true);
+  private void startCommit(String instantTime, String actionType, HoodieTableMetaClient metaClient) {

Review comment:
       Does this have to be non-static ? Can it be moved to CommitUtils ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490491745



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       So, this is tricky to explain. In FileSystemView, only metadata seems to be eagerly loaded. file groups are not eagerly loaded. i.e., fetchAllStoredFileGroups() returns empty.  For replace instants, we need to get List<FileSlice> for all fileId. Because fetchAllStoredFileGroups() is empty, its also returning empty list of FileSlices. So we dont delete replaced files.
   
   I think instead of creating new HoodieTable in constructor. passing that from callers would help workaround this problem. But that is somewhat involved change because of test dependencies. Also, it might be better to refresh partition content in case new files are created by compaction or other process and somehow that is not reflected in table views. This might be safer option.
   
   Let me know if you want me to work on passing in HoodieTable to HoodieTimelineArchiveLog constructor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488343582



##########
File path: hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
##########
@@ -36,6 +36,14 @@
          ],
          "default": null
       },
+      {

Review comment:
       Moved it. will check if its possible to add unit test for backward compatibility.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r493957561



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -554,14 +608,16 @@ protected HoodieBaseFile addBootstrapBaseFileIfPresent(HoodieFileGroupId fileGro
       readLock.lock();
       String partition = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partition);
-      return fetchAllStoredFileGroups(partition).map(fileGroup -> {
-        Option<FileSlice> fileSlice = fileGroup.getLatestFileSliceBeforeOrOn(maxInstantTime);
-        // if the file-group is under construction, pick the latest before compaction instant time.
-        if (fileSlice.isPresent()) {
-          fileSlice = Option.of(fetchMergedFileSlice(fileGroup, fileSlice.get()));
-        }
-        return fileSlice;
-      }).filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
+      return fetchAllStoredFileGroups(partition)
+          .filter(fg -> !isFileGroupReplaced(fg.getFileGroupId()))

Review comment:
       same case here,, we need to use the maxInstantTime passed here instead of the timeline's maxInstant.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -425,10 +459,14 @@ protected HoodieBaseFile addBootstrapBaseFileIfPresent(HoodieFileGroupId fileGro
       readLock.lock();
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
-      return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles()
-          .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
-              instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst().orElse(null))
-          .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, fileId), df));
+      if (isFileGroupReplaced(partitionPath, fileId)) {

Review comment:
       @satishkotha : Dont we need to use instant time when checking for replaced-file here ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -727,6 +795,26 @@ private String formatPartitionKey(String partitionStr) {
    */
   abstract Stream<HoodieFileGroup> fetchAllStoredFileGroups();
 
+  /**
+   * Track instant time for file groups replaced.
+   */
+  protected abstract void resetReplacedFileGroups(final Map<HoodieFileGroupId, HoodieInstant> replacedFileGroups);
+
+  /**
+   * Track instant time for new file groups replaced.
+   */
+  protected abstract void addReplacedFileGroups(final Map<HoodieFileGroupId, HoodieInstant> replacedFileGroups);
+
+  /**
+   * Remove file groups that are replaced in any of the specified instants.
+   */
+  protected abstract void removeReplacedFileIds(Set<String> instants);

Review comment:
       rename removeReplacedFileIdsAtInstants ?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       We are creating metaclient and loading timeline once here and in the function called in the next line. Can you make sure you create metaclient only once without loading timeline.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class to generate compaction plan from FileGroup/FileSlice abstraction.

Review comment:
       Doc needs fixing.

##########
File path: hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
##########
@@ -39,6 +39,18 @@
         "name":"version",
         "type":["int", "null"],
         "default": 1
+     },
+     /* overlaps with 'commitsRollback' field. Adding this to track action type for all the instants being rolled back. */

Review comment:
       Can we move this to separate avsc file and reference it here 

##########
File path: hudi-common/src/main/avro/HoodieRestoreMetadata.avsc
##########
@@ -34,6 +34,8 @@
         "name":"version",
         "type":["int", "null"],
         "default": 1
-     }
+     },
+     /* overlaps with 'instantsToRollback' field. Adding this to track action type for all the instants being rolled back. */
+     {"name": "restoreInstantInfo", "default": null, "type": {"type": "array", "default": null, "items": ["null", "HoodieInstantInfo"]}}

Review comment:
       Can you use pretty print mode (multi-lines) in the same way other section is presented.

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -91,40 +92,47 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
   }
 
   /**
+   *
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
-      Option<Map<String, String>> extraMetadata) {
-    List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, stats, extraMetadata);
+                        Option<Map<String, String>> extraMetadata) {
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, extraMetadata, actionType, Collections.emptyMap());
   }
 
-  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata) {
-    LOG.info("Committing " + instantTime);
+  /**
+   * Complete changes performed at the given instantTime marker with specified action.
+   */
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
+      Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
+    List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
+    return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
+  }
+
+  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
+                             String commitActionType) {
+    return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap());
+  }
+
+  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
+                             String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
+    LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieTableMetaClient metaClient = createMetaClient(false);

Review comment:
       this is no longer needed and can be removed ?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       My bad. Got confused with the method naming :) 

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, metaClient.getCommitActionType(), metaClient);
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) with specified action.
+   */
+  public void startCommitWithTime(String instantTime, String actionType) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, actionType, metaClient);
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) with specified action.
+   */
+  private void startCommitWithTime(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
     // NOTE : Need to ensure that rollback is done before a new commit is started
     if (rollbackPending) {
       // Only rollback inflight commit/delta-commits. Do not touch compaction commits
       rollbackPendingCommits();
     }
-    startCommit(instantTime);
+    startCommit(instantTime, actionType, metaClient);
   }
 
-  private void startCommit(String instantTime) {
-    LOG.info("Generate a new instant time " + instantTime);
-    HoodieTableMetaClient metaClient = createMetaClient(true);
+  private void startCommit(String instantTime, String actionType, HoodieTableMetaClient metaClient) {

Review comment:
       Does this have to be non-static ? Can it be moved to CommitUtils ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r479632310



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
##########
@@ -77,6 +79,13 @@ public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieT
     }
   }
 
+  @Override
+  protected Map<String, Set<String>> createPartitionToExcludeFileGroups() {
+    // TODO should we create another spillable directory under baseStoreDir?
+    // the exclude file group is expected to be small, so use parent class in-memory representation
+    return super.createPartitionToExcludeFileGroups();

Review comment:
       Please follow the same structure like the one we did for compaction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r481364462



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceStat.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Statistics about a single Hoodie replace operation.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceStat extends HoodieWriteStat {
+
+  // records from the 'getFileId()' can be written to multiple new file groups. This list tracks all new fileIds
+  private List<String> newFileIds;

Review comment:
       its part of HoodieWriteStat#fileId

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
##########
@@ -355,6 +357,18 @@ public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaC
     }
   }
 
+  @Override
+  public Stream<String> getAllExcludeFileGroups(final String partitionPath) {

Review comment:
       changed

##########
File path: hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
##########
@@ -284,6 +284,13 @@ private void registerFileSlicesAPI() {
       writeValueAsString(ctx, dtos);
     }, true));
 
+    app.get(RemoteHoodieTableFileSystemView.ALL_EXCLUDE_FILEGROUPS_FOR_PARTITION_URL, new ViewHandler(ctx -> {

Review comment:
       changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r493882291



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##########
@@ -251,6 +262,28 @@ private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant)
     LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add newly found REPLACE instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param instant REPLACE Instant
+   */
+  private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {

Review comment:
       @bvaradar Made the change and added basic test. Please take a look. If the general approach looks good. I'll add more complex tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r481365985



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,44 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+    Option<HoodieInstant> replaceInstantOption = metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+        .filter(replaceInstant -> replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+    replaceInstantOption.ifPresent(replaceInstant -> {
+      try {
+        HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
+            metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+            HoodieCommitMetadata.class);
+
+        metadata.getPartitionToReplaceStats().entrySet().stream().forEach(entry ->
+            deleteFileGroups(entry.getKey(), entry.getValue().stream().map(e -> e.getFileId()).collect(Collectors.toSet()), instant)
+        );
+      } catch (IOException e) {
+        throw new HoodieCommitException("Failed to archive because cannot delete replace files", e);
+      }
+    });
+  }
+
+  private void deleteFileGroups(String partitionPath, Set<String> fileIdsToDelete, HoodieInstant instant) {
+    try {
+      FileStatus[] statuses = metaClient.getFs().listStatus(FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPath));

Review comment:
       Modified to use FileSystemViews




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r481365742



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
##########
@@ -77,6 +79,13 @@ public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieT
     }
   }
 
+  @Override
+  protected Map<String, Set<String>> createPartitionToExcludeFileGroups() {
+    // TODO should we create another spillable directory under baseStoreDir?
+    // the exclude file group is expected to be small, so use parent class in-memory representation
+    return super.createPartitionToExcludeFileGroups();

Review comment:
       Please take a look if i did it correctly. (To be honest, dont fully understand compaction implementation in great detail)

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
##########
@@ -148,6 +148,9 @@
    */
   Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);
 
+  Stream<String> getAllExcludeFileGroups(String partitionPath);

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r495137989



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, metaClient.getCommitActionType(), metaClient);
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) with specified action.
+   */
+  public void startCommitWithTime(String instantTime, String actionType) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, actionType, metaClient);
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) with specified action.
+   */
+  private void startCommitWithTime(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
     // NOTE : Need to ensure that rollback is done before a new commit is started
     if (rollbackPending) {
       // Only rollback inflight commit/delta-commits. Do not touch compaction commits
       rollbackPendingCommits();
     }
-    startCommit(instantTime);
+    startCommit(instantTime, actionType, metaClient);
   }
 
-  private void startCommit(String instantTime) {
-    LOG.info("Generate a new instant time " + instantTime);
-    HoodieTableMetaClient metaClient = createMetaClient(true);
+  private void startCommit(String instantTime, String actionType, HoodieTableMetaClient metaClient) {

Review comment:
       This is a private method. Do you want to make this public static?  Personally, I think having all startCommit methods in HoodieWriteClient makes more sense because user workflow is
   
   1) writeClient#startCommit
   2) writeClient#upsert
   3) writeClient#commit
   
   But if you have a strong preference to make this part of CommitUtils, I can move it. let me know.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha edited a comment on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha edited a comment on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-687391466


   @vinothchandar @bvaradar  Addressed most of your suggestions. Couple other followup items I need help from you on:
   
   1) You suggested to remove HoodieReplaceStat.  I ran into minor implementation issue removing it. Basically, HoodieWriteClient operations return JavaRDD[WriteStatus].  SparkSqlWriter uses these WriteStatus to create metadata (.commit/.replace etc).  Each WriteStatus comes with HoodieWriteStat (which is expected to be non-null in many places). This HoodieWriteStat is used for many post commit operations. So if we want to remove HoodieReplaceStat,  we can either
   a) change signature of WriteClient operations to return a new structured object instead of just returning JavaRDD<WriteStatus>. This object would contain  JavaRDD[WriteStatus] for newly created files and List<HoodieFileGroupId> for tracking file groups replaced. We have to change post commit operations to look at this new object instead of WriteStatus.
   OR
   b) Return a WriteStatus for replaced file groups too. WriteClient operations can continue to return JavaRDD[WriteStatus]. Each WriteStatus has HoodieWriteStat which can be a token value (null?) for replaced file groups. 
   
   Either way, this is somewhat involved change, so would like to get your feedback before starting implementation. What do you guys think?
   
   2) Deleting replaced file groups during archival vs clean. I've this deletion logic implemented in archival per our earlier conversation. But, as I mentioned, this may lead to storage inefficiency. For example, a) clean retain is set to 1 commit.  b) archival is done after 24 commits. We keep all the data for replaced files until archival happens. 
   
   Let me know if you guys have any other comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488343363



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class to generate compaction plan from FileGroup/FileSlice abstraction.
+ */
+public class CommitUtils {
+
+  private static final Logger LOG = LogManager.getLogger(CommitUtils.class);
+
+  public static HoodieCommitMetadata buildWriteActionMetadata(List<HoodieWriteStat> writeStats,

Review comment:
       Added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-682247412


   @vinothchandar @bvaradar FYI. There are few things that I'm not fully happy with. But would like to get initial feedback and get agreement on high level approach.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r479634813



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,44 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+    Option<HoodieInstant> replaceInstantOption = metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+        .filter(replaceInstant -> replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+    replaceInstantOption.ifPresent(replaceInstant -> {
+      try {
+        HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
+            metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+            HoodieCommitMetadata.class);
+
+        metadata.getPartitionToReplaceStats().entrySet().stream().forEach(entry ->
+            deleteFileGroups(entry.getKey(), entry.getValue().stream().map(e -> e.getFileId()).collect(Collectors.toSet()), instant)
+        );
+      } catch (IOException e) {
+        throw new HoodieCommitException("Failed to archive because cannot delete replace files", e);
+      }
+    });
+  }
+
+  private void deleteFileGroups(String partitionPath, Set<String> fileIdsToDelete, HoodieInstant instant) {
+    try {
+      FileStatus[] statuses = metaClient.getFs().listStatus(FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPath));

Review comment:
       Instead of direct listStatus, can you use FileSystemViewAbstraction to get the file-group and then delete each files in it ? THis way, once consolidated metadata becomes available, you can take advantage of that. cc @prashantwason 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488284655



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
##########
@@ -200,6 +199,14 @@ public static void createInflightCommitFiles(String basePath, String... instantT
     }
   }
 
+  public static HoodieWriteStat createReplaceStat(final String partitionPath, final String fileId1) {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r493003513



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -738,7 +799,9 @@ private String formatPartitionKey(String partitionStr) {
    * @param commitsToReturn Commits
    */
   Stream<FileSlice> fetchLatestFileSliceInRange(List<String> commitsToReturn) {
-    return fetchAllStoredFileGroups().map(fileGroup -> fileGroup.getLatestFileSliceInRange(commitsToReturn))
+    return fetchAllStoredFileGroups()
+        .filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup, commitsToReturn))

Review comment:
       @satishkotha  : As discussed, All the replace filtering needs to move to getXXX() apis as the fetch APIs are only responsible for fetching file slices/base-files from different types of storage. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488346156



##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -880,6 +880,89 @@ public void testDeletesWithDeleteApi() throws Exception {
     testDeletes(client, updateBatch3.getRight(), 10, file1, "007", 140, keysSoFar);
   }
 
+  /**
+   * Test scenario of writing more file groups than existing number of file groups in partition.
+   */
+  @Test
+  public void testInsertOverwritePartitionHandlingWithMoreRecords() throws Exception {
+    verifyInsertOverwritePartitionHandling(1000, 3000);
+  }
+
+  /**
+   * Test scenario of writing fewer file groups than existing number of file groups in partition.
+   */
+  @Test
+  public void testInsertOverwritePartitionHandlingWithFewerRecords() throws Exception {
+    verifyInsertOverwritePartitionHandling(3000, 1000);
+  }
+
+  /**
+   * Test scenario of writing similar number file groups in partition.
+   */
+  @Test
+  public void testInsertOverwritePartitionHandlinWithSimilarNumberOfRecords() throws Exception {
+    verifyInsertOverwritePartitionHandling(3000, 3000);
+  }
+
+  /**
+   *  1) Do write1 (upsert) with 'batch1RecordsCount' number of records.
+   *  2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of records.
+   *
+   *  Verify that all records in step1 are overwritten
+   */
+  private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, int batch2RecordsCount) throws Exception {
+    final String testPartitionPath = "americas";
+    HoodieWriteConfig config = getSmallInsertWriteConfig(2000);
+    HoodieWriteClient client = getHoodieWriteClient(config, false);
+    dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+
+    // Do Inserts
+    String commitTime1 = "001";
+    client.startCommitWithTime(commitTime1);
+    List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, batch1RecordsCount);
+    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
+    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
+    assertNoWriteErrors(statuses);
+    Set<String> batch1Buckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
+    verifyParquetFileData(commitTime1, inserts1, statuses);
+
+    // Do Insert Overwrite
+    String commitTime2 = "002";
+    client.startCommitWithTime(commitTime2, HoodieTimeline.REPLACE_COMMIT_ACTION);
+    List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, batch2RecordsCount);
+    List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
+    insertsAndUpdates2.addAll(inserts2);
+    JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = jsc.parallelize(insertsAndUpdates2, 2);
+    statuses = client.insertOverwrite(insertAndUpdatesRDD2, commitTime2).collect();
+    assertNoWriteErrors(statuses);
+    Set<String> replacedBuckets = statuses.stream().filter(s -> s.isReplacedFileId())
+        .map(s -> s.getFileId()).collect(Collectors.toSet());
+    assertEquals(batch1Buckets, replacedBuckets);
+    List<WriteStatus> newBuckets = statuses.stream().filter(s -> !(s.isReplacedFileId()))
+        .collect(Collectors.toList());
+    verifyParquetFileData(commitTime2, inserts2, newBuckets);
+  }
+
+  /**
+   * Verify data in parquet files matches expected records and commit time.
+   */
+  private void verifyParquetFileData(String commitTime, List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus) {

Review comment:
       renamed it for now. I'll look into if there are any other helpers for doing this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488209685



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -576,7 +592,8 @@ public String startCommit() {
       rollbackPendingCommits();
     }
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
-    startCommit(instantTime);
+    HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488344859



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * All the metadata that gets stored along with a commit.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
+  private static final Logger LOG = LogManager.getLogger(HoodieReplaceCommitMetadata.class);
+  protected Map<String, List<HoodieWriteStat>> partitionToReplaceStats;

Review comment:
       I changed it to List<String> to include only fileIds. I'm inclined against storing all file slices because they can evolve between metadata creation and archival/clean. Let me know if this understanding is incorrect




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r493957561



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -554,14 +608,16 @@ protected HoodieBaseFile addBootstrapBaseFileIfPresent(HoodieFileGroupId fileGro
       readLock.lock();
       String partition = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partition);
-      return fetchAllStoredFileGroups(partition).map(fileGroup -> {
-        Option<FileSlice> fileSlice = fileGroup.getLatestFileSliceBeforeOrOn(maxInstantTime);
-        // if the file-group is under construction, pick the latest before compaction instant time.
-        if (fileSlice.isPresent()) {
-          fileSlice = Option.of(fetchMergedFileSlice(fileGroup, fileSlice.get()));
-        }
-        return fileSlice;
-      }).filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
+      return fetchAllStoredFileGroups(partition)
+          .filter(fg -> !isFileGroupReplaced(fg.getFileGroupId()))

Review comment:
       same case here,, we need to use the maxInstantTime passed here instead of the timeline's maxInstant.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -425,10 +459,14 @@ protected HoodieBaseFile addBootstrapBaseFileIfPresent(HoodieFileGroupId fileGro
       readLock.lock();
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
-      return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles()
-          .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
-              instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst().orElse(null))
-          .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, fileId), df));
+      if (isFileGroupReplaced(partitionPath, fileId)) {

Review comment:
       @satishkotha : Dont we need to use instant time when checking for replaced-file here ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -727,6 +795,26 @@ private String formatPartitionKey(String partitionStr) {
    */
   abstract Stream<HoodieFileGroup> fetchAllStoredFileGroups();
 
+  /**
+   * Track instant time for file groups replaced.
+   */
+  protected abstract void resetReplacedFileGroups(final Map<HoodieFileGroupId, HoodieInstant> replacedFileGroups);
+
+  /**
+   * Track instant time for new file groups replaced.
+   */
+  protected abstract void addReplacedFileGroups(final Map<HoodieFileGroupId, HoodieInstant> replacedFileGroups);
+
+  /**
+   * Remove file groups that are replaced in any of the specified instants.
+   */
+  protected abstract void removeReplacedFileIds(Set<String> instants);

Review comment:
       rename removeReplacedFileIdsAtInstants ?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       We are creating metaclient and loading timeline once here and in the function called in the next line. Can you make sure you create metaclient only once without loading timeline.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class to generate compaction plan from FileGroup/FileSlice abstraction.

Review comment:
       Doc needs fixing.

##########
File path: hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
##########
@@ -39,6 +39,18 @@
         "name":"version",
         "type":["int", "null"],
         "default": 1
+     },
+     /* overlaps with 'commitsRollback' field. Adding this to track action type for all the instants being rolled back. */

Review comment:
       Can we move this to separate avsc file and reference it here 

##########
File path: hudi-common/src/main/avro/HoodieRestoreMetadata.avsc
##########
@@ -34,6 +34,8 @@
         "name":"version",
         "type":["int", "null"],
         "default": 1
-     }
+     },
+     /* overlaps with 'instantsToRollback' field. Adding this to track action type for all the instants being rolled back. */
+     {"name": "restoreInstantInfo", "default": null, "type": {"type": "array", "default": null, "items": ["null", "HoodieInstantInfo"]}}

Review comment:
       Can you use pretty print mode (multi-lines) in the same way other section is presented.

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -91,40 +92,47 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
   }
 
   /**
+   *
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
-      Option<Map<String, String>> extraMetadata) {
-    List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, stats, extraMetadata);
+                        Option<Map<String, String>> extraMetadata) {
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, extraMetadata, actionType, Collections.emptyMap());
   }
 
-  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata) {
-    LOG.info("Committing " + instantTime);
+  /**
+   * Complete changes performed at the given instantTime marker with specified action.
+   */
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
+      Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
+    List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
+    return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
+  }
+
+  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
+                             String commitActionType) {
+    return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap());
+  }
+
+  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
+                             String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
+    LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieTableMetaClient metaClient = createMetaClient(false);

Review comment:
       this is no longer needed and can be removed ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-696208094


   @satishkotha : Please ping me in the PR  when you have updates and I can give incremental comments if needed.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490461178



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {
+      fg.getAllRawFileSlices().forEach(fileSlice -> {
+        fileSlice.getBaseFile().map(baseFile -> deletePath(baseFile.getFileStatus().getPath(), instant));
+        fileSlice.getLogFiles().forEach(logFile -> deletePath(logFile.getPath(), instant));
+      });
+    });
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, TableFileSystemView fileSystemView) {
+    Option<HoodieInstant> replaceInstantOption = metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+        .filter(replaceInstant -> replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+    replaceInstantOption.ifPresent(replaceInstant -> {
+      try {
+        HoodieReplaceCommitMetadata metadata = HoodieReplaceCommitMetadata.fromBytes(
+            metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+            HoodieReplaceCommitMetadata.class);
+
+        metadata.getPartitionToReplaceStats().keySet().forEach(partition -> fileSystemView.getAllFileGroups(partition));
+      } catch (IOException e) {
+        throw new HoodieCommitException("Failed to archive because cannot delete replace files", e);
+      }
+    });
+  }
+
+  private boolean deletePath(Path path, HoodieInstant instant) {
+    try {
+      LOG.info("Deleting " + path + " before archiving " + instant);
+      metaClient.getFs().delete(path);

Review comment:
       i missed that we already have JavaSparkContext. Implemented parallel clean up. PTAL




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488344523



##########
File path: hudi-common/src/main/avro/HoodieReplaceCommitMetadata.avsc
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+   "namespace":"org.apache.hudi.avro.model",
+   "type":"record",
+   "name":"HoodieReplaceCommitMetadata",
+   "fields":[
+      {
+         "name":"partitionToWriteStats",

Review comment:
       Moved partitionToReplaceStats at the end. Its also possible to make this reference HoodieCommitMetadata directly i.e.,
   
   HoodieReplaceCommitMetadata contains HoodieCommitMetadata and replaceFileIds. But converting between json replaceCommit and avro version will require another layer of transform code. So to keep it simple, I copied all the fields from json structure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r486369440



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,55 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);

Review comment:
       can't we just call `commit(String, JavaRDD, Option.empty())` without having to implement this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r486369440



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,55 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);

Review comment:
       can't we just call `commit(String, JavaRDD, Option.empty())` without having to implement this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r489099999



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
##########
@@ -371,6 +371,47 @@ void removeBootstrapBaseFileMapping(Stream<BootstrapBaseFileMapping> bootstrapBa
         schemaHelper.getPrefixForSliceViewByPartitionFile(partitionPath, fileId)).map(Pair::getValue)).findFirst());
   }
 
+  @Override
+  protected void resetReplacedFileGroups(final Map<HoodieFileGroupId, HoodieInstant> replacedFileGroups) {

Review comment:
       This part looks good. Going through rest of the code




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r487541865



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -95,6 +93,13 @@ public HoodieWriteMetadata execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
       saveWorkloadProfileMetadataToInflight(profile, instantTime);
     }
 
+    JavaRDD<WriteStatus> writeStatusRDD = processInputRecords(inputRecordsRDD, profile);
+    HoodieWriteMetadata result = new HoodieWriteMetadata();
+    updateIndexAndCommitIfNeeded(writeStatusRDD, result);
+    return result;
+  }
+
+  protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>> inputRecordsRDD, WorkloadProfile profile) {

Review comment:
       rename: writeInputRecords() 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -213,6 +213,12 @@ public abstract HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String i
   public abstract HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
       JavaRDD<HoodieRecord<T>> preppedRecords,  Option<BulkInsertPartitioner> bulkInsertPartitioner);
 
+  /**
+   * Logically delete all existing records and Insert a batch of new records into Hoodie table at the supplied instantTime.
+   */
+  public abstract HoodieWriteMetadata insertOverwrite(JavaSparkContext jsc, String instantTime,

Review comment:
       I think at the HoodieTable level, the API has to be about replacing file groups and not insertOverwrite (which can be limited to the WriteClient level). This way clustering can also use the same method, to build on top. 

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -378,9 +398,9 @@ public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInst
         new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName());
     FileSystem fs = FSUtils.getFs(basePath, configuration);
     try (FSDataOutputStream os = fs.create(commitFile, true)) {
-      HoodieCompactionPlan workload = new HoodieCompactionPlan();
+      HoodieCompactionPlan workload = HoodieCompactionPlan.newBuilder().setVersion(1).build();

Review comment:
       why is this change necessayr? 

##########
File path: hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
##########
@@ -102,7 +104,9 @@ public void commit(WriterCommitMessage[] messages) {
             .flatMap(m -> m.getWriteStatuses().stream().map(m2 -> m2.getStat())).collect(Collectors.toList());
 
     try {
-      writeClient.commitStats(instantTime, writeStatList, Option.empty());
+      writeClient.commitStats(instantTime, writeStatList, Option.empty(),

Review comment:
       better approach for these situations generally is to introduce a `commitStats(.)` that does not take the last argument and deal with it internally inside WriteClient. 
   
   This way the code will remain more readable, without having to reference replace stats in bulk_insert, which have nothing to do with each other. hope that makes sense

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,57 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);
+  }
+
+  /**
+   * Complete changes performed at the given instantTime marker with specified action.
+   */
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, String commitActionType) {
+    return commit(instantTime, writeStatuses, Option.empty(), commitActionType);
   }
 
   /**
+   *
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
-      Option<Map<String, String>> extraMetadata) {
-    List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, stats, extraMetadata);
+                        Option<Map<String, String>> extraMetadata) {
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, extraMetadata, actionType);
   }
 
-  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata) {
-    LOG.info("Committing " + instantTime);
+  /**
+   * Complete changes performed at the given instantTime marker with specified action.
+   */
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,

Review comment:
       note to self: lets see if we can simply the commit() overloaded methods. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,57 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);
+  }
+
+  /**
+   * Complete changes performed at the given instantTime marker with specified action.
+   */
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, String commitActionType) {
+    return commit(instantTime, writeStatuses, Option.empty(), commitActionType);
   }
 
   /**
+   *
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
-      Option<Map<String, String>> extraMetadata) {
-    List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, stats, extraMetadata);
+                        Option<Map<String, String>> extraMetadata) {
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, extraMetadata, actionType);
   }
 
-  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata) {
-    LOG.info("Committing " + instantTime);
+  /**
+   * Complete changes performed at the given instantTime marker with specified action.
+   */
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
+      Option<Map<String, String>> extraMetadata, String commitActionType) {
+    List<HoodieWriteStat> writeStats = writeStatuses.filter(w -> !w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+    List<HoodieWriteStat> replaceStats = writeStatuses.filter(w -> w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+
+    return commitStats(instantTime, writeStats, extraMetadata, commitActionType, replaceStats);
+  }
+
+  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
+                             String commitActionType, List<HoodieWriteStat> replaceStats) {
+    LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieTableMetaClient metaClient = createMetaClient(false);
-    String actionType = metaClient.getCommitActionType();
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
 
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
-    stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), stat));
-
+    HoodieCommitMetadata metadata = CommitUtils.buildWriteActionMetadata(stats, replaceStats, extraMetadata, operationType, config.getSchema(), commitActionType);

Review comment:
       rename: to just buildMetdata() , Commit is already  implicit from context. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -204,41 +212,44 @@ protected void commitOnAutoCommit(HoodieWriteMetadata result) {
   }
 
   protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result) {
-    commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collect());
+    List<HoodieWriteStat> writeStats = result.getWriteStatuses().filter(w -> !w.isReplacedFileId()).map(WriteStatus::getStat).collect();

Review comment:
       note to self: see if there is a way to avoid repeating this filtering here again 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -204,41 +212,44 @@ protected void commitOnAutoCommit(HoodieWriteMetadata result) {
   }
 
   protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result) {
-    commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collect());
+    List<HoodieWriteStat> writeStats = result.getWriteStatuses().filter(w -> !w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+    List<HoodieWriteStat> replacedStats = result.getWriteStatuses().filter(w -> w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+    commit(extraMetadata, result, writeStats, replacedStats);
   }
 
-  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result, List<HoodieWriteStat> stats) {
-    String actionType = table.getMetaClient().getCommitActionType();
+  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result, List<HoodieWriteStat> writeStats, List<HoodieWriteStat> replaceStats) {
+    String actionType = getCommitActionType();
     LOG.info("Committing " + instantTime + ", action Type " + actionType);
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
 
-    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
 
     result.setCommitted(true);
-    stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), stat));
-    result.setWriteStats(stats);
+    result.setWriteStats(writeStats);
+    result.setReplaceStats(replaceStats);
 
     // Finalize write
-    finalizeWrite(instantTime, stats, result);
-
-    // add in extra metadata
-    if (extraMetadata.isPresent()) {
-      extraMetadata.get().forEach(metadata::addMetadata);
-    }
-    metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, getSchemaToStoreInCommit());
-    metadata.setOperationType(operationType);
+    finalizeWrite(instantTime, writeStats, result);
 
     try {
-      activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
-          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-      LOG.info("Committed " + instantTime);
+      HoodieCommitMetadata metadata = writeInstant(writeStats, replaceStats, extraMetadata);
+      result.setCommitMetadata(Option.of(metadata));
     } catch (IOException e) {
       throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
           e);
     }
-    result.setCommitMetadata(Option.of(metadata));
+  }
+
+  private HoodieCommitMetadata writeInstant(List<HoodieWriteStat>  writeStats, List<HoodieWriteStat> replaceStats, Option<Map<String, String>> extraMetadata) throws IOException {

Review comment:
       rename: completeInstant(), its better to stay close to what the method is doing; using just one terminology

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {

Review comment:
       rename: deleteReplacedFileGroups() , to be consistent with our terminology 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -204,41 +212,44 @@ protected void commitOnAutoCommit(HoodieWriteMetadata result) {
   }
 
   protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result) {
-    commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collect());
+    List<HoodieWriteStat> writeStats = result.getWriteStatuses().filter(w -> !w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+    List<HoodieWriteStat> replacedStats = result.getWriteStatuses().filter(w -> w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+    commit(extraMetadata, result, writeStats, replacedStats);
   }
 
-  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result, List<HoodieWriteStat> stats) {
-    String actionType = table.getMetaClient().getCommitActionType();
+  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result, List<HoodieWriteStat> writeStats, List<HoodieWriteStat> replaceStats) {
+    String actionType = getCommitActionType();
     LOG.info("Committing " + instantTime + ", action Type " + actionType);
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
 
-    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
 
     result.setCommitted(true);
-    stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), stat));
-    result.setWriteStats(stats);
+    result.setWriteStats(writeStats);
+    result.setReplaceStats(replaceStats);
 
     // Finalize write
-    finalizeWrite(instantTime, stats, result);
-
-    // add in extra metadata
-    if (extraMetadata.isPresent()) {
-      extraMetadata.get().forEach(metadata::addMetadata);
-    }
-    metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, getSchemaToStoreInCommit());
-    metadata.setOperationType(operationType);

Review comment:
       nts: need to ensure the operation type  is properly set 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {

Review comment:
       we need to do the deletion in parallel. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {
+      fg.getAllRawFileSlices().forEach(fileSlice -> {
+        fileSlice.getBaseFile().map(baseFile -> deletePath(baseFile.getFileStatus().getPath(), instant));
+        fileSlice.getLogFiles().forEach(logFile -> deletePath(logFile.getPath(), instant));
+      });
+    });
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, TableFileSystemView fileSystemView) {
+    Option<HoodieInstant> replaceInstantOption = metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+        .filter(replaceInstant -> replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+    replaceInstantOption.ifPresent(replaceInstant -> {

Review comment:
       This seems like a check for whether the instant is a replacecommit or not. if the instant time is a completed instant and replacecommit type, then we must find the instant here, right? 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {
+      fg.getAllRawFileSlices().forEach(fileSlice -> {
+        fileSlice.getBaseFile().map(baseFile -> deletePath(baseFile.getFileStatus().getPath(), instant));
+        fileSlice.getLogFiles().forEach(logFile -> deletePath(logFile.getPath(), instant));
+      });
+    });
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, TableFileSystemView fileSystemView) {
+    Option<HoodieInstant> replaceInstantOption = metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+        .filter(replaceInstant -> replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+    replaceInstantOption.ifPresent(replaceInstant -> {
+      try {
+        HoodieReplaceCommitMetadata metadata = HoodieReplaceCommitMetadata.fromBytes(
+            metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+            HoodieReplaceCommitMetadata.class);
+
+        metadata.getPartitionToReplaceStats().keySet().forEach(partition -> fileSystemView.getAllFileGroups(partition));
+      } catch (IOException e) {
+        throw new HoodieCommitException("Failed to archive because cannot delete replace files", e);
+      }
+    });
+  }
+
+  private boolean deletePath(Path path, HoodieInstant instant) {
+    try {
+      LOG.info("Deleting " + path + " before archiving " + instant);
+      metaClient.getFs().delete(path);

Review comment:
       you probably dont want to fetch the fs object each time? Also lets delete in parallel, from the get go?  we will invariably need to do this, much like parallelizing cleaning.  

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {

Review comment:
       probably a check that this is a replace instant as well? 

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -576,7 +592,8 @@ public String startCommit() {
       rollbackPendingCommits();
     }
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
-    startCommit(instantTime);
+    HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       you can just call startCommitWithTime(instantTime) from here? 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       Do we need to ask the file system view for all the replace file groups? this must be in the metadata already right? As long as we can get the HoodieFileGroup objects corresponding to the filegroup ids in the metadata, we can go ahead? What I am suggesting in an alternative and subjectively cleaner replacement for `ensureReplaced...` above, which seems to make a dummy read to warm up the datastuctures. I prefer to let that happen naturally on its own as opposed to having this "special" call

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -110,14 +116,16 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
-    this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline.getCommitsAndCompactionTimeline();
+    this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline.getWriteActionTimeline();
+    resetFileGroupsReplaced(visibleCommitsAndCompactionTimeline);
   }
 
   /**
    * Adds the provided statuses into the file system view, and also caches it inside this object.
    */
   protected List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
     HoodieTimer timer = new HoodieTimer().startTimer();
+

Review comment:
       nit: remove extra line?

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
+
+  private static final Logger LOG = LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+  public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+                                             HoodieWriteConfig config, HoodieTable table,
+                                             String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime, WriteOperationType.INSERT_OVERWRITE);
+    this.inputRecordsRDD = inputRecordsRDD;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
+        config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
+  }
+
+  @Override
+  protected Partitioner getPartitioner(WorkloadProfile profile) {
+    return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>> inputRecordsRDD, WorkloadProfile profile) {
+    // get all existing fileIds to mark them as replaced
+    JavaRDD<WriteStatus> replaceStatuses = getAllReplaceWriteStatus(profile);
+    // do necessary inserts into new file groups
+    JavaRDD<WriteStatus> writeStatuses = super.processInputRecords(inputRecordsRDD, profile);
+    return writeStatuses.union(replaceStatuses);
+  }
+
+  private JavaRDD<WriteStatus> getAllReplaceWriteStatus(WorkloadProfile profile) {
+    JavaRDD<String> partitions = jsc.parallelize(new ArrayList<>(profile.getPartitionPaths()));
+    JavaRDD<WriteStatus> replaceStatuses = partitions.flatMap(partition ->
+        getAllExistingFileIds(partition).map(fileId -> getReplaceWriteStatus(partition, fileId)).iterator());
+
+    return replaceStatuses;
+  }
+
+  private Stream<String> getAllExistingFileIds(String partitionPath) {
+    // because new commit is not complete. it is safe to mark all base files as old files
+    return table.getBaseFileOnlyView().getAllBaseFiles(partitionPath).map(baseFile -> baseFile.getFileId());
+  }
+
+  private WriteStatus getReplaceWriteStatus(String partitionPath, String fileId) {
+    // mark file as 'replaced' in metadata. the actual file will be removed later by cleaner to provide snapshot isolation
+    WriteStatus status = new WriteStatus(false, 0.0);
+    status.setReplacedFileId(true);
+    status.setFileId(fileId);
+    status.setTotalErrorRecords(0);
+    status.setPartitionPath(partitionPath);
+    HoodieWriteStat replaceStat = new HoodieWriteStat();
+    status.setStat(replaceStat);
+    replaceStat.setPartitionPath(partitionPath);
+    replaceStat.setFileId(fileId);
+    replaceStat.setPath(table.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get().getPath());
+    status.getStat().setNumDeletes(Integer.MAX_VALUE);//token to indicate all rows are deleted

Review comment:
       Seeing such large values in the metadata can be bit confusing. can we set it to -1 instead for now

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
+
+  private static final Logger LOG = LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+  public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+                                             HoodieWriteConfig config, HoodieTable table,
+                                             String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime, WriteOperationType.INSERT_OVERWRITE);
+    this.inputRecordsRDD = inputRecordsRDD;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
+        config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
+  }
+
+  @Override
+  protected Partitioner getPartitioner(WorkloadProfile profile) {
+    return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>> inputRecordsRDD, WorkloadProfile profile) {
+    // get all existing fileIds to mark them as replaced
+    JavaRDD<WriteStatus> replaceStatuses = getAllReplaceWriteStatus(profile);

Review comment:
       So, this creates a dependency on the workloadProfile for doing insert overwrite. While we always provide a WorkloadProfile for now, in the future we would like to remove this need for caching data in memory and building the profile. 
   Can we try to reimplement this such that 
   
    - processInputRecords(..) just writes the new records and returns WriteStatus for the new file groups alone.
    - During commit time, after we collect the WriteStatus, we can obtain the `replaceStatuses` based on the partitions that were actually written to during step above. 
    
    This also gives us a cleaner solution for avoiding the boolean flag we discussed. API is also consistent now, that writeClient.insertOverwrite() only returns the WriteStatus for the new file group IDs. 
   
   

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
##########
@@ -94,6 +94,14 @@ public void setWriteStats(List<HoodieWriteStat> writeStats) {
     this.writeStats = Option.of(writeStats);
   }
 
+  public Option<List<HoodieWriteStat>> getReplacetats() {

Review comment:
       typo: getReplaceStats

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,15 +603,23 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, metaClient.getCommitActionType());
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) with specified action.
+   */
+  public void startCommitWithTime(String instantTime, String actionType) {
     // NOTE : Need to ensure that rollback is done before a new commit is started
     if (rollbackPending) {
       // Only rollback inflight commit/delta-commits. Do not touch compaction commits
       rollbackPendingCommits();
     }
-    startCommit(instantTime);
+    startCommit(instantTime, actionType);
   }
 
-  private void startCommit(String instantTime) {
+  private void startCommit(String instantTime, String actionType) {
     LOG.info("Generate a new instant time " + instantTime);
     HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       can we pass in the `metaClient` from caller. This seems to introduce additional creations, which all list .hoodie again 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -110,14 +116,16 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
-    this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline.getCommitsAndCompactionTimeline();
+    this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline.getWriteActionTimeline();
+    resetFileGroupsReplaced(visibleCommitsAndCompactionTimeline);

Review comment:
       I think its sufficient todo this reset in the init() method above, much like pendingCompaction and bootstreap handling. This method is simply used to refresh the timeline i.e the instants that are visible. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
##########
@@ -46,11 +46,12 @@
   public static final String SCHEMA_KEY = "schema";
   private static final Logger LOG = LogManager.getLogger(HoodieCommitMetadata.class);
   protected Map<String, List<HoodieWriteStat>> partitionToWriteStats;
+

Review comment:
       nit: extra line

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
+
+  private static final Logger LOG = LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+  public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+                                             HoodieWriteConfig config, HoodieTable table,
+                                             String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime, WriteOperationType.INSERT_OVERWRITE);
+    this.inputRecordsRDD = inputRecordsRDD;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
+        config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
+  }
+
+  @Override
+  protected Partitioner getPartitioner(WorkloadProfile profile) {
+    return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>> inputRecordsRDD, WorkloadProfile profile) {
+    // get all existing fileIds to mark them as replaced
+    JavaRDD<WriteStatus> replaceStatuses = getAllReplaceWriteStatus(profile);
+    // do necessary inserts into new file groups
+    JavaRDD<WriteStatus> writeStatuses = super.processInputRecords(inputRecordsRDD, profile);
+    return writeStatuses.union(replaceStatuses);
+  }
+
+  private JavaRDD<WriteStatus> getAllReplaceWriteStatus(WorkloadProfile profile) {
+    JavaRDD<String> partitions = jsc.parallelize(new ArrayList<>(profile.getPartitionPaths()));
+    JavaRDD<WriteStatus> replaceStatuses = partitions.flatMap(partition ->
+        getAllExistingFileIds(partition).map(fileId -> getReplaceWriteStatus(partition, fileId)).iterator());
+
+    return replaceStatuses;
+  }
+
+  private Stream<String> getAllExistingFileIds(String partitionPath) {
+    // because new commit is not complete. it is safe to mark all base files as old files
+    return table.getBaseFileOnlyView().getAllBaseFiles(partitionPath).map(baseFile -> baseFile.getFileId());

Review comment:
       why limit to just base files. we may have log files without base files. i.e insert to log files code path

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -110,14 +116,16 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
-    this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline.getCommitsAndCompactionTimeline();
+    this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline.getWriteActionTimeline();

Review comment:
       now that replace also is `replacecommit`. should we just leave the `getCommitsAndCompactionTimeline()` be? 

##########
File path: hudi-common/src/main/avro/HoodieReplaceCommitMetadata.avsc
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+   "namespace":"org.apache.hudi.avro.model",
+   "type":"record",
+   "name":"HoodieReplaceCommitMetadata",
+   "fields":[
+      {
+         "name":"partitionToWriteStats",

Review comment:
       for my own understanding, copying the fields from CommitMetadata is the only way to "inherit" the avro schema, I guess? 
   also, to be consistent. should we first place the base fields from commit metadata first, and add `partitionToReplaceStats` at the end? 

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/WriteStatus.java
##########
@@ -52,6 +52,9 @@
 
   private HoodieWriteStat stat = null;
 
+  // if true, indicates the fileId in this WriteStatus is being replaced
+  private boolean isReplacedFileId;

Review comment:
       rename: isReplaced

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -880,6 +880,89 @@ public void testDeletesWithDeleteApi() throws Exception {
     testDeletes(client, updateBatch3.getRight(), 10, file1, "007", 140, keysSoFar);
   }
 
+  /**
+   * Test scenario of writing more file groups than existing number of file groups in partition.
+   */
+  @Test
+  public void testInsertOverwritePartitionHandlingWithMoreRecords() throws Exception {
+    verifyInsertOverwritePartitionHandling(1000, 3000);
+  }
+
+  /**
+   * Test scenario of writing fewer file groups than existing number of file groups in partition.
+   */
+  @Test
+  public void testInsertOverwritePartitionHandlingWithFewerRecords() throws Exception {
+    verifyInsertOverwritePartitionHandling(3000, 1000);
+  }
+
+  /**
+   * Test scenario of writing similar number file groups in partition.
+   */
+  @Test
+  public void testInsertOverwritePartitionHandlinWithSimilarNumberOfRecords() throws Exception {
+    verifyInsertOverwritePartitionHandling(3000, 3000);
+  }
+
+  /**
+   *  1) Do write1 (upsert) with 'batch1RecordsCount' number of records.
+   *  2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of records.
+   *
+   *  Verify that all records in step1 are overwritten
+   */
+  private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, int batch2RecordsCount) throws Exception {
+    final String testPartitionPath = "americas";
+    HoodieWriteConfig config = getSmallInsertWriteConfig(2000);
+    HoodieWriteClient client = getHoodieWriteClient(config, false);
+    dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+
+    // Do Inserts
+    String commitTime1 = "001";
+    client.startCommitWithTime(commitTime1);
+    List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, batch1RecordsCount);
+    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
+    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
+    assertNoWriteErrors(statuses);
+    Set<String> batch1Buckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
+    verifyParquetFileData(commitTime1, inserts1, statuses);
+
+    // Do Insert Overwrite
+    String commitTime2 = "002";
+    client.startCommitWithTime(commitTime2, HoodieTimeline.REPLACE_COMMIT_ACTION);
+    List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, batch2RecordsCount);
+    List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
+    insertsAndUpdates2.addAll(inserts2);
+    JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = jsc.parallelize(insertsAndUpdates2, 2);
+    statuses = client.insertOverwrite(insertAndUpdatesRDD2, commitTime2).collect();
+    assertNoWriteErrors(statuses);
+    Set<String> replacedBuckets = statuses.stream().filter(s -> s.isReplacedFileId())
+        .map(s -> s.getFileId()).collect(Collectors.toSet());
+    assertEquals(batch1Buckets, replacedBuckets);
+    List<WriteStatus> newBuckets = statuses.stream().filter(s -> !(s.isReplacedFileId()))
+        .collect(Collectors.toList());
+    verifyParquetFileData(commitTime2, inserts2, newBuckets);
+  }
+
+  /**
+   * Verify data in parquet files matches expected records and commit time.
+   */
+  private void verifyParquetFileData(String commitTime, List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus) {

Review comment:
       please keep test/naming to base and log files. and not leak parquet to the test? Also can you please see if this test can be authored by reusing existing helpers. Its often bit hard to read and reuse the exisiting helpers, but hte more one-offs we introduce, the worse this situation becomes. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class to generate compaction plan from FileGroup/FileSlice abstraction.
+ */
+public class CommitUtils {
+
+  private static final Logger LOG = LogManager.getLogger(CommitUtils.class);
+
+  public static HoodieCommitMetadata buildWriteActionMetadata(List<HoodieWriteStat> writeStats,

Review comment:
       please add a simple unit tests for this . testing for e.g that the schema is set, op type is set etc

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -196,6 +205,32 @@ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
     return fileGroups;
   }
 
+  /**
+   * Get replaced instant for each file group by looking at all commit instants.
+   */
+  private void resetFileGroupsReplaced(HoodieTimeline timeline) {
+    Instant indexStartTime = Instant.now();

Review comment:
       please use HoodieTimer to time code segments.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java
##########
@@ -22,5 +22,5 @@
  * The supported action types.
  */
 public enum ActionType {
-  commit, savepoint, compaction, clean, rollback
+  commit, savepoint, compaction, clean, rollback, replacecommit

Review comment:
       deltacommit is not here. huh. file a "code cleanup" JIRA for later? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -880,6 +957,30 @@ private FileSlice fetchMergedFileSlice(HoodieFileGroup fileGroup, FileSlice file
         .fromJavaOptional(fetchLatestFileSlices(partitionPath).filter(fs -> fs.getFileId().equals(fileId)).findFirst());
   }
 
+  private boolean isFileGroupReplaced(HoodieFileGroup fileGroup) {
+    Option<HoodieInstant> hoodieInstantOption = getReplacedInstant(fileGroup.getFileGroupId());
+    return hoodieInstantOption.isPresent();
+  }
+
+  private boolean isFileGroupReplacedBeforeAny(HoodieFileGroup fileGroup, List<String> instants) {
+    Option<HoodieInstant> hoodieInstantOption = getReplacedInstant(fileGroup.getFileGroupId());
+    if (!hoodieInstantOption.isPresent()) {
+      return false;
+    }
+
+    return HoodieTimeline.compareTimestamps(instants.stream().max(Comparator.naturalOrder()).get(),

Review comment:
       can you just call isFileGroupReplacedBeforeOrOn(fileGroup, max(instants)) ? without having to implement this again

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * All the metadata that gets stored along with a commit.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
+  private static final Logger LOG = LogManager.getLogger(HoodieReplaceCommitMetadata.class);
+  protected Map<String, List<HoodieWriteStat>> partitionToReplaceStats;

Review comment:
       these are the file groups being replaced? I thought we were going to just track the file ids? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -727,6 +775,21 @@ private String formatPartitionKey(String partitionStr) {
    */
   abstract Stream<HoodieFileGroup> fetchAllStoredFileGroups();
 
+  /**
+   * Track instant time for file groups replaced.
+   */
+  protected abstract void resetReplacedFileGroups(final Map<HoodieFileGroupId, HoodieInstant> replacedFileGroups);
+
+  /**
+   * Track instant time for new file groups replaced.
+   */
+  protected abstract void addReplacedFileGroups(final Map<HoodieFileGroupId, HoodieInstant> replacedFileGroups);
+
+  /**
+   * Track instant time for file groups replaced.
+   */
+  protected abstract Option<HoodieInstant> getReplacedInstant(final HoodieFileGroupId fileGroupId);

Review comment:
       rename: getReplaceInstant() 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
##########
@@ -113,6 +112,18 @@ public HoodieDefaultTimeline getCommitsAndCompactionTimeline() {
     return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details);
   }
 
+  @Override
+  public HoodieDefaultTimeline getWriteActionTimeline() {
+    Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
+    return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details);
+  }
+
+  @Override
+  public HoodieTimeline getCompletedAndReplaceTimeline() {

Review comment:
       rename: getCompletedReplaceTimeline()  current naming gives the impression that its either completed or replacecommit

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
##########
@@ -200,6 +199,14 @@ public static void createInflightCommitFiles(String basePath, String... instantT
     }
   }
 
+  public static HoodieWriteStat createReplaceStat(final String partitionPath, final String fileId1) {

Review comment:
       there is nothing specific about replace in this method? should we move this to the test class itself. inline? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
##########
@@ -64,6 +64,11 @@
    */
   protected Map<HoodieFileGroupId, BootstrapBaseFileMapping> fgIdToBootstrapBaseFile;
 
+  /**
+   * Track replace time for replaced file groups.
+   */
+  protected Map<HoodieFileGroupId, HoodieInstant> fgIdToReplaceInstant;

Review comment:
       rename: fgIdToReplaceInstants
   

##########
File path: hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
##########
@@ -36,6 +36,14 @@
          ],
          "default": null
       },
+      {

Review comment:
       should we just add at the end? not sure if it will be backwards compatible nicely otherwise.

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -366,6 +378,14 @@ public static void createCompactionRequestedFile(String basePath, String instant
     createEmptyFile(basePath, commitFile, configuration);
   }
 
+  public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileID, Configuration configuration)

Review comment:
       for tests, I suggest using the `HoodieWritableTestTable` etc instead of introducing new methods. Also please check other utilities to avoid writing a new method here

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -376,31 +379,35 @@ private[hudi] object HoodieSparkSqlWriter {
     metaSyncSuccess
   }
 
+  /**
+   * Scala says method cannot have more than 7 arguments. So group all table/action specific information into a case class.

Review comment:
       please remove the scala specific comment. its good to encapsulate like this anyway

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
##########
@@ -371,6 +371,47 @@ void removeBootstrapBaseFileMapping(Stream<BootstrapBaseFileMapping> bootstrapBa
         schemaHelper.getPrefixForSliceViewByPartitionFile(partitionPath, fileId)).map(Pair::getValue)).findFirst());
   }
 
+  @Override
+  protected void resetReplacedFileGroups(final Map<HoodieFileGroupId, HoodieInstant> replacedFileGroups) {

Review comment:
       @bvaradar if you can take a pass at these, that would be great 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       One more consideration, as I went through the remainder of the PR. if there was an pending compaction for the replaced file group, then the file group metadata we encode may miss new base files produced as a result of the compaction. This scenario needs to be thought thru.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
##########
@@ -133,6 +137,21 @@
    */
   HoodieTimeline getCommitsAndCompactionTimeline();
 
+  /**
+   * Timeline to just include commits (commit/deltacommit), replace and compaction actions.
+   *
+   * @return
+   */
+  HoodieDefaultTimeline getWriteActionTimeline();

Review comment:
       see earlier comment on whether we need this method. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       As an after thought, I also realize that if we just encoded the entire file group being replaced into the metadata, (as opposed to just encoding the file ids), we can simply delete the file groups without any interaction with tableFileSytemView at all. Probably a simpler solution even? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
##########
@@ -133,6 +137,21 @@
    */
   HoodieTimeline getCommitsAndCompactionTimeline();
 
+  /**
+   * Timeline to just include commits (commit/deltacommit), replace and compaction actions.
+   *
+   * @return
+   */
+  HoodieDefaultTimeline getWriteActionTimeline();

Review comment:
       To expand, I am wondering if we should just include replacecommit within `getCommitsAndCompactionTimeline()`. Most of its callers are around compaction/savepoint/restore etc. So we may not be seeing some cases here. 
   
   Things work for now, since filterCompletedInstants() etc are including replace commit in the timeline when filtering for queries. Semantically, if replace is a commit level action that can add new data to the timeline, then we should just treat it like delta commit IMO. 
   
   Would anything break if we did do that? 
   
   

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * All the metadata that gets stored along with a commit.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
+  private static final Logger LOG = LogManager.getLogger(HoodieReplaceCommitMetadata.class);
+  protected Map<String, List<HoodieWriteStat>> partitionToReplaceStats;

Review comment:
       if these are the file groups being replaced, then does this contain all the file slices (see my comment around deleting the replaced file groups in timeline archive log) 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -213,6 +213,12 @@ public abstract HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String i
   public abstract HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
       JavaRDD<HoodieRecord<T>> preppedRecords,  Option<BulkInsertPartitioner> bulkInsertPartitioner);
 
+  /**
+   * Logically delete all existing records and Insert a batch of new records into Hoodie table at the supplied instantTime.
+   */
+  public abstract HoodieWriteMetadata insertOverwrite(JavaSparkContext jsc, String instantTime,

Review comment:
       On second thoughts, I am okay leaving this as-is for now as well. and reeval when acutally implementing clustering

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -111,6 +111,9 @@ private[hudi] object HoodieSparkSqlWriter {
         tableConfig = tableMetaClient.getTableConfig
       }
 
+      val metaClient = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get)
+      val commitActionType = DataSourceUtils.getCommitActionType(operation, metaClient)

Review comment:
       should we have a better way of getting the commit action type? I am bit concerned about creating new metaClient just for this. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
##########
@@ -371,6 +371,47 @@ void removeBootstrapBaseFileMapping(Stream<BootstrapBaseFileMapping> bootstrapBa
         schemaHelper.getPrefixForSliceViewByPartitionFile(partitionPath, fileId)).map(Pair::getValue)).findFirst());
   }
 
+  @Override
+  protected void resetReplacedFileGroups(final Map<HoodieFileGroupId, HoodieInstant> replacedFileGroups) {

Review comment:
       General question I have around incremental file system view and rocksDB like persistent file system view storage is whether we will keep this list updated. i.e when the archival/cleaning runs, how do we ensure the deleted replaced file groups are no longer tracked inside rocksdb. 
   
   I guess the lines below, are doing a bulk delete and insert to achieve the same?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r486369440



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,55 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);

Review comment:
       can't we just call `commit(String, JavaRDD, Option.empty())` without having to implement this?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,55 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);

Review comment:
       can't we just call `commit(String, JavaRDD, Option.empty())` without having to implement this?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,55 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);

Review comment:
       can't we just call `commit(String, JavaRDD, Option.empty())` without having to implement this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488216199



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
+
+  private static final Logger LOG = LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+  public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+                                             HoodieWriteConfig config, HoodieTable table,
+                                             String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime, WriteOperationType.INSERT_OVERWRITE);
+    this.inputRecordsRDD = inputRecordsRDD;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
+        config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
+  }
+
+  @Override
+  protected Partitioner getPartitioner(WorkloadProfile profile) {
+    return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>> inputRecordsRDD, WorkloadProfile profile) {
+    // get all existing fileIds to mark them as replaced
+    JavaRDD<WriteStatus> replaceStatuses = getAllReplaceWriteStatus(profile);
+    // do necessary inserts into new file groups
+    JavaRDD<WriteStatus> writeStatuses = super.processInputRecords(inputRecordsRDD, profile);
+    return writeStatuses.union(replaceStatuses);
+  }
+
+  private JavaRDD<WriteStatus> getAllReplaceWriteStatus(WorkloadProfile profile) {
+    JavaRDD<String> partitions = jsc.parallelize(new ArrayList<>(profile.getPartitionPaths()));
+    JavaRDD<WriteStatus> replaceStatuses = partitions.flatMap(partition ->
+        getAllExistingFileIds(partition).map(fileId -> getReplaceWriteStatus(partition, fileId)).iterator());
+
+    return replaceStatuses;
+  }
+
+  private Stream<String> getAllExistingFileIds(String partitionPath) {
+    // because new commit is not complete. it is safe to mark all base files as old files
+    return table.getBaseFileOnlyView().getAllBaseFiles(partitionPath).map(baseFile -> baseFile.getFileId());

Review comment:
       My bad, missed that insert into log files case. I fixed it now. thanks for finding this bug.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488990799



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {
+      fg.getAllRawFileSlices().forEach(fileSlice -> {
+        fileSlice.getBaseFile().map(baseFile -> deletePath(baseFile.getFileStatus().getPath(), instant));
+        fileSlice.getLogFiles().forEach(logFile -> deletePath(logFile.getPath(), instant));
+      });
+    });
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, TableFileSystemView fileSystemView) {
+    Option<HoodieInstant> replaceInstantOption = metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+        .filter(replaceInstant -> replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+    replaceInstantOption.ifPresent(replaceInstant -> {
+      try {
+        HoodieReplaceCommitMetadata metadata = HoodieReplaceCommitMetadata.fromBytes(
+            metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+            HoodieReplaceCommitMetadata.class);
+
+        metadata.getPartitionToReplaceStats().keySet().forEach(partition -> fileSystemView.getAllFileGroups(partition));
+      } catch (IOException e) {
+        throw new HoodieCommitException("Failed to archive because cannot delete replace files", e);
+      }
+    });
+  }
+
+  private boolean deletePath(Path path, HoodieInstant instant) {
+    try {
+      LOG.info("Deleting " + path + " before archiving " + instant);
+      metaClient.getFs().delete(path);

Review comment:
       for parallel deletes, JavaSparkContext is not exposed to Archive process. Since we anyway want to move this to be part of clean, is it ok if  I defer this to https://issues.apache.org/jira/browse/HUDI-1276? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488226577



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -111,6 +111,9 @@ private[hudi] object HoodieSparkSqlWriter {
         tableConfig = tableMetaClient.getTableConfig
       }
 
+      val metaClient = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get)
+      val commitActionType = DataSourceUtils.getCommitActionType(operation, metaClient)

Review comment:
       We just need tableType to create commit/deltacommit in the default case. I moved this to a static method in util class and removed dependency on MetaClient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490491745



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       So, this is tricky to explain. In FileSystemView, only metadata seems to be eagerly loaded. file groups are not eagerly loaded. i.e., fetchAllStoredFileGroups() returns empty.  For replace instants, we need to get List<FileSlice> for all replaced fileId. Because fetchAllStoredFileGroups() is empty, its also returning empty list of FileSlices. So we dont delete replaced files.
   
   I think instead of creating new HoodieTable in constructor. passing that from callers would help workaround this problem. But that is somewhat involved change because of test dependencies. Also, it might be better to refresh partition content in case new files are created by compaction or other process and somehow that is not reflected in table views. This might be safer option.
   
   Let me know if you want me to work on passing in HoodieTable to HoodieTimelineArchiveLog constructor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r492303122



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##########
@@ -251,6 +262,28 @@ private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant)
     LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add newly found REPLACE instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param instant REPLACE Instant
+   */
+  private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {

Review comment:
       I need to understand this flow a bit more. But, have a question on why we need to track commit-action-type and timestamp. Today, HoodieRollbackMetadata tracks successFiles, deletedFiles etc.  Do you think we can add replacedFileIds also there? This will be empty for regular commits. But for replace commits, it will have some content.  If this value is present, we can remove corresponding fileIds from View#replacedFileGroups. Let me know if i'm missing anything with this approach.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-696208094


   @satishkotha : Please ping me in the PR  when you have updates and I can give incremental comments if needed.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r479626501



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -173,29 +180,59 @@ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
     List<HoodieFileGroup> fileGroups = new ArrayList<>();
     fileIdSet.forEach(pair -> {
       String fileId = pair.getValue();
-      HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, timeline);
-      if (baseFiles.containsKey(pair)) {
-        baseFiles.get(pair).forEach(group::addBaseFile);
-      }
-      if (logFiles.containsKey(pair)) {
-        logFiles.get(pair).forEach(group::addLogFile);
-      }
+      String partitionPath = pair.getKey();
+      if (isExcludeFileGroup(partitionPath, fileId)) {

Review comment:
       As discussed, lets retain all the file-groups but perform filtering in the get APIs. THis would avoid correctness issues in filtering and also makes handling incremental file system view easier.

##########
File path: hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
##########
@@ -89,6 +89,10 @@ public FileSliceHandler(Configuration conf, FileSystemViewManager viewManager) t
         .collect(Collectors.toList());
   }
 
+  public List<String> getExcludeFileGroups(String basePath, String partitionPath) {

Review comment:
       Rename to getReplacedFileGroups

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
##########
@@ -355,6 +357,18 @@ public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaC
     }
   }
 
+  @Override
+  public Stream<String> getAllExcludeFileGroups(final String partitionPath) {

Review comment:
       getAllExcludeFileGroups -> getReplacedFileGroups ?

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
##########
@@ -104,4 +104,8 @@ protected SyncableFileSystemView getFileSystemViewWithUnCommittedSlices(HoodieTa
   protected HoodieTableType getTableType() {
     return HoodieTableType.COPY_ON_WRITE;
   }
+
+  protected boolean areTimeTravelTestsEnabled() {

Review comment:
       why is this needed ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceStat.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Statistics about a single Hoodie replace operation.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceStat extends HoodieWriteStat {
+
+  // records from the 'getFileId()' can be written to multiple new file groups. This list tracks all new fileIds
+  private List<String> newFileIds;

Review comment:
       Why are we not tracking dropped fileIds ?

##########
File path: hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
##########
@@ -284,6 +284,13 @@ private void registerFileSlicesAPI() {
       writeValueAsString(ctx, dtos);
     }, true));
 
+    app.get(RemoteHoodieTableFileSystemView.ALL_EXCLUDE_FILEGROUPS_FOR_PARTITION_URL, new ViewHandler(ctx -> {

Review comment:
       ALL_EXCLUDE_FILEGROUPS_FOR_PARTITION_URL -> REPLACED_FILEGROUPS_FOR_PARTITION_URL
   
   Lets use a single consistent name "replaced" instead of exclude everywhere.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
##########
@@ -102,7 +102,7 @@ public void commit(WriterCommitMessage[] messages) {
             .flatMap(m -> m.getWriteStatuses().stream().map(m2 -> m2.getStat())).collect(Collectors.toList());
 
     try {
-      writeClient.commitStats(instantTime, writeStatList, Option.empty());
+      writeClient.commitStats(instantTime, writeStatList, Option.empty(), HoodieTimeline.COMMIT_ACTION); //TODO get action type from HoodieWriterCommitMessage

Review comment:
       Are you planning to address this TODO as part of this PR ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha edited a comment on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha edited a comment on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-696327041


   > @satishkotha : Please ping me in the PR when you have updates and I can give incremental comments if needed.
   
   @bvaradar Incremental FileSystem resotre is the only big pending item. I'll get to it in later part of this week.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488207278



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       Yes, compaction is the primary reason I only recorded fileId in the replace metadata. When deleting, we can get all file paths (through view or by listing using consolidated metadata) that have same fileId and delete these files.  
   
   There can be race conditions that compaction might create a new file with replaced fileId after we queried for existing files though. But because FileSystemView#get methods do not include replaced file groups, I think this is unlikely to happen. I'm not sure if there are edge cases with long running compactions.
   
   Please suggest any other improvements.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r479632381



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
##########
@@ -148,6 +148,9 @@
    */
   Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);
 
+  Stream<String> getAllExcludeFileGroups(String partitionPath);

Review comment:
       I think I have mentioned it somewhere else. Lets denote this feature with the consistent term "Replace" everywhere.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar merged pull request #2048: [HUDI-1072] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar merged pull request #2048:
URL: https://github.com/apache/hudi/pull/2048


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r494727484



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       This is not calling function in the next line (calling method after that). So we only create meta client once. Please double check and let me know if i'm misinterpreting your suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488212928



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,15 +603,23 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, metaClient.getCommitActionType());
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) with specified action.
+   */
+  public void startCommitWithTime(String instantTime, String actionType) {
     // NOTE : Need to ensure that rollback is done before a new commit is started
     if (rollbackPending) {
       // Only rollback inflight commit/delta-commits. Do not touch compaction commits
       rollbackPendingCommits();
     }
-    startCommit(instantTime);
+    startCommit(instantTime, actionType);
   }
 
-  private void startCommit(String instantTime) {
+  private void startCommit(String instantTime, String actionType) {
     LOG.info("Generate a new instant time " + instantTime);
     HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       Good point. Will do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488284965



##########
File path: hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
##########
@@ -102,7 +104,9 @@ public void commit(WriterCommitMessage[] messages) {
             .flatMap(m -> m.getWriteStatuses().stream().map(m2 -> m2.getStat())).collect(Collectors.toList());
 
     try {
-      writeClient.commitStats(instantTime, writeStatList, Option.empty());
+      writeClient.commitStats(instantTime, writeStatList, Option.empty(),

Review comment:
       Sure. Added new method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-689812755


   >You suggested to remove HoodieReplaceStat
   
   I think the suggestion was to simplify HoodieReplaceMetadata such that it only contains the extra information about replaced file groups. and use the HoodieCommitMetadata and its HoodieWriteStat for tracking the new file groups written.
   We could have HoodieReplaceStat to be part of the WriteStatus itself for tracking the additional information about replaced file groups? 
   
   On cleaning vs archival, it would be good if we can implement this in cleaning. But can that be a follow-on item? Practically speaking, typical deployments don't configure cleaning that low. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-696327041


   > @satishkotha : Please ping me in the PR when you have updates and I can give incremental comments if needed.
   
   @bvaradar IncrementalTimeline resotre is the only big pending item. I'll get to it in later part of this week.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488345604



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
##########
@@ -133,6 +137,21 @@
    */
   HoodieTimeline getCommitsAndCompactionTimeline();
 
+  /**
+   * Timeline to just include commits (commit/deltacommit), replace and compaction actions.
+   *
+   * @return
+   */
+  HoodieDefaultTimeline getWriteActionTimeline();

Review comment:
       I removed and included getCommitsAndCompactionTimeline. I think we will run into some edge cases for MOR tables where something would break. But don't have concrete examples. We can run it for a while on MOR table and see if works. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-691710534


   @satishkotha can you please resolve all the comments, that you have addressed already. That way we can track whats pending


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha edited a comment on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha edited a comment on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-687391466


   @vinothchandar @bvaradar  Addressed most of your suggestions. Couple other followup items I need help from you on:
   
   1) You suggested to remove HoodieReplaceStat.  I ran into minor implementation issue removing it. Basically, HoodieWriteClient operations return JavaRDD[WriteStatus].  SparkSqlWriter uses these WriteStatus to create metadata (.commit/.replace etc).  Each WriteStatus comes with HoodieWriteStat (which is expected to be non-null in many places). This HoodieWriteStat is used for many post commit operations. So if we want to remove HoodieReplaceStat,  we can either
   a) change signature of WriteClient operations to return a new structured object instead of just returning JavaRDD[WriteStatus]. This object would contain List[HoodieFileGroupId] for tracking file groups replaced and JavaRDD[WriteStatus] for newly created file groups. We have to change post commit operations to look at this new object instead of WriteStatus.
   OR
   b) Return a WriteStatus for replaced file groups too. WriteClient operations can continue to return JavaRDD[WriteStatus]. Each WriteStatus has HoodieWriteStat which can be a token value (null?) for replaced file groups. 
   
   Either way, this is somewhat involved change, so would like to get your feedback before starting implementation. What do you guys think?
   
   2) Deleting replaced file groups during archival vs clean. I've this deletion logic implemented in archival per our earlier conversation. But, as I mentioned, this may lead to storage inefficiency. For example, a) clean retain is set to 1 commit.  b) archival is done after 24 commits. We keep all the data for replaced files until archival happens. 
   
   Let me know if you guys have any other comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r493881151



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -738,7 +799,9 @@ private String formatPartitionKey(String partitionStr) {
    * @param commitsToReturn Commits
    */
   Stream<FileSlice> fetchLatestFileSliceInRange(List<String> commitsToReturn) {
-    return fetchAllStoredFileGroups().map(fileGroup -> fileGroup.getLatestFileSliceInRange(commitsToReturn))
+    return fetchAllStoredFileGroups()
+        .filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup, commitsToReturn))

Review comment:
       Changed. Please take a look.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha edited a comment on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha edited a comment on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-687391466


   @vinothchandar @bvaradar  Addressed most of your suggestions. Couple other followup items I need help from you on:
   
   1) You suggested to remove HoodieReplaceStat.  I ran into minor implementation issue removing it. Basically, HoodieWriteClient operations return JavaRDD[WriteStatus].  SparkSqlWriter uses these WriteStatus to create metadata (.commit/.replace etc).  Each WriteStatus comes with HoodieWriteStat (which is expected to be non-null in many places). This HoodieWriteStat is used for many post commit operations. So if we want to remove HoodieReplaceStat,  we can either
   a) change signature of WriteClient operations to return a new structured object instead of just returning JavaRDD[WriteStatus]. This object would contain List[HoodieFileGroupId] for tracking file groups replaced and JavaRDD[WriteStatus] for newly created file groups. We have to change post commit operations to look at this new object instead of WriteStatus.
   OR
   b) Return a WriteStatus for replaced file groups too. WriteClient operations can continue to return JavaRDD[WriteStatus]. Each WriteStatus has HoodieWriteStat which can be a token value (null?) for replaced file groups. 
   
   Either way, this is somewhat involved change, so would like to get your feedback before starting implementation. What do you guys think?
   
   2) Deleting replaced file groups during archival vs clean. I've this deletion logic implemented in archival per our earlier conversation. But, as I mentioned, this may lead to storage inefficiency. For example, a) clean retain is set to 1 commit.  b) archival is set to be done after 24 commits. We keep all the data for replaced files until archival happens. 
   
   Let me know if you guys have any other comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r492303122



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##########
@@ -251,6 +262,28 @@ private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant)
     LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add newly found REPLACE instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param instant REPLACE Instant
+   */
+  private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {

Review comment:
       I need to understand this flow a bit more. But, have a question on why we need to track commit-action-type and timestamp. Today, HoodieRollbackMetadata tracks successFiles, deletedFiles etc.  Do you think we can add replacedFileIds also there? This will be empty for regular commits. But for replace commits, it will have some content.  If this value is present, we can remove corresponding fileIds from View#replacedFileGroups. Let me know if i'm missing anything with this approach.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488215834



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
##########
@@ -94,6 +94,14 @@ public void setWriteStats(List<HoodieWriteStat> writeStats) {
     this.writeStats = Option.of(writeStats);
   }
 
+  public Option<List<HoodieWriteStat>> getReplacetats() {

Review comment:
       This is not needed anymore given we are tracking replaced files as boolean in WriteStatus. I removed this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r495700438



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,24 +602,39 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, metaClient.getCommitActionType(), metaClient);
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) with specified action.
+   */
+  public void startCommitWithTime(String instantTime, String actionType) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, actionType, metaClient);
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) with specified action.
+   */
+  private void startCommitWithTime(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
     // NOTE : Need to ensure that rollback is done before a new commit is started
     if (rollbackPending) {
       // Only rollback inflight commit/delta-commits. Do not touch compaction commits
       rollbackPendingCommits();
     }
-    startCommit(instantTime);
+    startCommit(instantTime, actionType, metaClient);
   }
 
-  private void startCommit(String instantTime) {
-    LOG.info("Generate a new instant time " + instantTime);
-    HoodieTableMetaClient metaClient = createMetaClient(true);
+  private void startCommit(String instantTime, String actionType, HoodieTableMetaClient metaClient) {

Review comment:
       Lets leave it for now If needed, we can refactor later. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-696327041


   > @satishkotha : Please ping me in the PR when you have updates and I can give incremental comments if needed.
   
   @bvaradar IncrementalTimeline resotre is the only big pending item. I'll get to it in later part of this week.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490589908



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       @satishkotha Here is the plan as we discussed.
   1. Change the signature of fileSystemView.getReplacedFileGroupsBeforeOrOn to also take in partitionId
   2.  In HoodieTimelineArchiveLog.deleteReplacedFileGroups, read the replace metadata (which we are already doing) and for each partition, call fileSystemView.getReplacedFileGroupsBeforeOrOn().
   3. (2) must be done in such a way that we are calling the fileSystemView.getReplacedFileGroupsBeforeOrOn in parallel.
   
   This should allow for lazy loading semantics to be retained at file-system view.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r481364171



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -173,29 +180,59 @@ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
     List<HoodieFileGroup> fileGroups = new ArrayList<>();
     fileIdSet.forEach(pair -> {
       String fileId = pair.getValue();
-      HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, timeline);
-      if (baseFiles.containsKey(pair)) {
-        baseFiles.get(pair).forEach(group::addBaseFile);
-      }
-      if (logFiles.containsKey(pair)) {
-        logFiles.get(pair).forEach(group::addLogFile);
-      }
+      String partitionPath = pair.getKey();
+      if (isExcludeFileGroup(partitionPath, fileId)) {

Review comment:
       Modified. Please take a look. RocksDB needs to be implemented.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490476688



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {

Review comment:
       Implemented parallel execution. PTAL




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490621340



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       Done. PTAL.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r486369440



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,55 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);

Review comment:
       can't we just call `commit(String, JavaRDD, Option.empty())` without having to implement this?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,55 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);

Review comment:
       can't we just call `commit(String, JavaRDD, Option.empty())` without having to implement this?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,55 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);

Review comment:
       can't we just call `commit(String, JavaRDD, Option.empty())` without having to implement this?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,55 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);

Review comment:
       can't we just call `commit(String, JavaRDD, Option.empty())` without having to implement this?

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,55 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);

Review comment:
       can't we just call `commit(String, JavaRDD, Option.empty())` without having to implement this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488345849



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
##########
@@ -371,6 +371,47 @@ void removeBootstrapBaseFileMapping(Stream<BootstrapBaseFileMapping> bootstrapBa
         schemaHelper.getPrefixForSliceViewByPartitionFile(partitionPath, fileId)).map(Pair::getValue)).findFirst());
   }
 
+  @Override
+  protected void resetReplacedFileGroups(final Map<HoodieFileGroupId, HoodieInstant> replacedFileGroups) {

Review comment:
       Yes,  we do delete and insert. Definitely, would be helpful to have someone with more experience review this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha edited a comment on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha edited a comment on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-696327041


   > @satishkotha : Please ping me in the PR when you have updates and I can give incremental comments if needed.
   
   @bvaradar Incremental FileSystem resotre is the only big pending item. I'll get to it in later part of this week.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] satishkotha commented on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
satishkotha commented on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-690711545


   @vinothchandar As discussed, i added boolean in WriteStatus and removed HoodieReplaceStat. See this [diff](https://github.com/apache/hudi/pull/2048/commits/94b275dbd20ec82ebe568b47bb28447d92ab996f). I committed it as a separate git-sha because this still looks  somewhat awkward IMO. Please take a look and I can revert or reimplement in a different way
   
   Also, created https://issues.apache.org/jira/browse/HUDI-1276 for cleaning replaced file during clean.
   
   I also renamed 'replace' to 'replacecommit' everywhere as you suggested. 
   
   Please let me know if you have additional comments/suggestions


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

Posted by GitBox <gi...@apache.org>.
bvaradar commented on pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#issuecomment-699784611


   @satishkotha : when the conflicts are resolved and we can land this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org