You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/06/22 14:48:49 UTC

[GitHub] [hudi] lw309637554 opened a new pull request #1756: Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

lw309637554 opened a new pull request #1756:
URL: https://github.com/apache/hudi/pull/1756


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   add more tests for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting.
   also fix the bugs for RollbackActionExecutor with markers mode or filelisting mode:
   
   in HoodieWriteClient.java should not deleteMarkerDir, the rollback with markerfiles mode will failed
   in ListingBasedRollbackHelper.java "(path.toString().endsWith(HoodieFileFormat.HOODIE_LOG.getFileExtension()))" can not check file is logfile
   in "MarkerBasedRollbackStrategy.java" baseCommitTime should use FSUtils.getCommitTime(baseFilePathForAppend.getName());
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r456965828



##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -97,28 +98,9 @@ public Path makeNewPath(String partitionPath) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath) {
-    Path markerPath = makeNewMarkerPath(partitionPath);
-    try {
-      LOG.info("Creating Marker Path=" + markerPath);
-      fs.create(markerPath, false).close();
-    } catch (IOException e) {
-      throw new HoodieException("Failed to create marker file " + markerPath, e);
-    }
-  }
-
-  /**
-   * THe marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.
-   */
-  private Path makeNewMarkerPath(String partitionPath) {

Review comment:
       all of this stuff is now encapsulatd into a  `MarkerFiles` class

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -264,6 +275,7 @@ public void archive(List<HoodieInstant> instants) throws HoodieCommitException {
       List<IndexedRecord> records = new ArrayList<>();
       for (HoodieInstant hoodieInstant : instants) {
         try {
+          deleteAnyLeftOverMarkerFiles(hoodieInstant);

Review comment:
       during archival. either the commit instant or the corresponding rollback.. any left over marker dir will be deleted. or the archival will fail. there is a test added for this. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -410,72 +412,54 @@ public void deleteMarkerDir(String instantTs) {
    * @param consistencyCheckEnabled Consistency Check Enabled
    * @throws HoodieIOException
    */
-  protected void cleanFailedWrites(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats,
-      boolean consistencyCheckEnabled) throws HoodieIOException {
+  protected void reconcileAgainstMarkers(JavaSparkContext jsc,
+                                         String instantTs,
+                                         List<HoodieWriteStat> stats,
+                                         boolean consistencyCheckEnabled) throws HoodieIOException {
     try {
       // Reconcile marker and data files with WriteStats so that partially written data-files due to failed
       // (but succeeded on retry) tasks are removed.
       String basePath = getMetaClient().getBasePath();
-      FileSystem fs = getMetaClient().getFs();
-      Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
+      MarkerFiles markers = new MarkerFiles(this, instantTs);
 
-      if (!fs.exists(markerDir)) {
-        // Happens when all writes are appends
+      if (!markers.doesMarkerDirExist()) {
+        // can happen if it was an empty write say.
         return;
       }
 
-      final String baseFileExtension = getBaseFileFormat().getFileExtension();
-      List<String> invalidDataPaths = FSUtils.getAllDataFilesForMarkers(fs, basePath, instantTs, markerDir.toString(),
-          baseFileExtension);
-      List<String> validDataPaths = stats.stream().map(w -> String.format("%s/%s", basePath, w.getPath()))
-          .filter(p -> p.endsWith(baseFileExtension)).collect(Collectors.toList());
+      // we are not including log appends here, since they are already fail-safe.

Review comment:
       quick skim of changes here would be nice. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOType;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Operates on marker files for a given write action (commit, delta commit, compaction).
+ */
+public class MarkerFiles {
+
+  private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);
+
+  public static String stripMarkerSuffix(String path) {
+    return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
+  }
+
+  private final String instantTime;
+  private final FileSystem fs;
+  private final Path markerDirPath;
+  private final String basePath;
+
+  public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) {
+    this.instantTime = instantTime;
+    this.fs = fs;
+    this.markerDirPath = new Path(markerFolderPath);
+    this.basePath = basePath;
+  }
+
+  public MarkerFiles(HoodieTable<?> table, String instantTime) {
+    this(table.getMetaClient().getFs(),
+        table.getMetaClient().getBasePath(),
+        table.getMetaClient().getMarkerFolderPath(instantTime),
+        instantTime);
+  }
+
+  public void quietDeleteMarkerDir() {
+    try {
+      deleteMarkerDir();
+    } catch (HoodieIOException ioe) {
+      LOG.warn("Error deleting marker directory for instant " + instantTime, ioe);
+    }
+  }
+
+  /**
+   * Delete Marker directory corresponding to an instant.
+   */
+  public boolean deleteMarkerDir() {

Review comment:
       @umehrot2 we can now add the parallelization changes here. for deletion of marker files. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.io.IOType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Performs rollback using marker files generated during the write..
+ */
+public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.RollbackStrategy {
+
+  private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackStrategy.class);
+
+  private final HoodieTable<?> table;
+
+  private final transient JavaSparkContext jsc;
+
+  private final HoodieWriteConfig config;
+
+  private final String basePath;
+
+  private final String instantTime;
+
+  public MarkerBasedRollbackStrategy(HoodieTable<?> table, JavaSparkContext jsc, HoodieWriteConfig config, String instantTime) {
+    this.table = table;
+    this.jsc = jsc;
+    this.basePath = table.getMetaClient().getBasePath();
+    this.config = config;
+    this.instantTime = instantTime;
+  }
+
+  private HoodieRollbackStat undoMerge(String mergedBaseFilePath) throws IOException {
+    LOG.info("Rolling back by deleting the merged base file:" + mergedBaseFilePath);
+    return deleteBaseFile(mergedBaseFilePath);
+  }
+
+  private HoodieRollbackStat undoCreate(String createdBaseFilePath) throws IOException {
+    LOG.info("Rolling back by deleting the created base file:" + createdBaseFilePath);
+    return deleteBaseFile(createdBaseFilePath);
+  }
+
+  private HoodieRollbackStat deleteBaseFile(String baseFilePath) throws IOException {

Review comment:
       this is resilient already to attempting to delete an non-existent file.. marker file may not imply the data file is thre.

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -332,9 +333,12 @@ public static SparkConf registerClasses(SparkConf conf) {
   }
 
   @Override
-  protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
-      Option<Map<String, String>> extraMetadata) {
+  protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
     try {
+
+      // Delete the marker directory for the instant.

Review comment:
       this PR will change behavior for marker dir deletion, with or without marker based rollback turned on. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.io.IOType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Performs rollback using marker files generated during the write..
+ */
+public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.RollbackStrategy {
+
+  private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackStrategy.class);
+
+  private final HoodieTable<?> table;
+
+  private final transient JavaSparkContext jsc;
+
+  private final HoodieWriteConfig config;
+
+  private final String basePath;
+
+  private final String instantTime;
+
+  public MarkerBasedRollbackStrategy(HoodieTable<?> table, JavaSparkContext jsc, HoodieWriteConfig config, String instantTime) {
+    this.table = table;
+    this.jsc = jsc;
+    this.basePath = table.getMetaClient().getBasePath();
+    this.config = config;
+    this.instantTime = instantTime;
+  }
+
+  private HoodieRollbackStat undoMerge(String mergedBaseFilePath) throws IOException {
+    LOG.info("Rolling back by deleting the merged base file:" + mergedBaseFilePath);
+    return deleteBaseFile(mergedBaseFilePath);
+  }
+
+  private HoodieRollbackStat undoCreate(String createdBaseFilePath) throws IOException {
+    LOG.info("Rolling back by deleting the created base file:" + createdBaseFilePath);
+    return deleteBaseFile(createdBaseFilePath);
+  }
+
+  private HoodieRollbackStat deleteBaseFile(String baseFilePath) throws IOException {
+    Path fullDeletePath = new Path(basePath, baseFilePath);
+    String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
+    boolean isDeleted = table.getMetaClient().getFs().delete(fullDeletePath);
+    return HoodieRollbackStat.newBuilder()
+        .withPartitionPath(partitionPath)
+        .withDeletedFileResult(baseFilePath, isDeleted)
+        .build();
+  }
+
+  private HoodieRollbackStat undoAppend(String appendBaseFilePath, HoodieInstant instantToRollback) throws IOException, InterruptedException {
+    Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath);
+    String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
+    String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName());
+    String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
+
+    HoodieLogFormat.Writer writer = null;
+    try {
+      Path partitionFullPath = FSUtils.getPartitionPath(basePath, partitionPath);
+
+      if (!table.getMetaClient().getFs().exists(partitionFullPath)) {
+        return HoodieRollbackStat.newBuilder()
+            .withPartitionPath(partitionPath)
+            .build();
+      }
+      writer = HoodieLogFormat.newWriterBuilder()

Review comment:
       checked that the log scanner can deal with spurious rollback blocks.. i.e rollbacks logged without any data blocks for that instant




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r449948302



##########
File path: hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieRollbackStat;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestMergeOnReadRollbackActionExecutor extends HoodieClientTestBase {
+  @Override
+  protected HoodieTableType getTableType() {
+    return HoodieTableType.MERGE_ON_READ;
+  }
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    //just generate tow partitions
+    dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
+    initFileSystem();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private void twoUpsertCommitDataRollBack(boolean isUsingMarkers) throws IOException, InterruptedException {

Review comment:
       yes  make sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-652546754


   > > for rollback successful commit, in HoodieWriteClient.java i remove the deleteMarkerDir() in postcommit when is in usingmarkers mode. But it will double the file numbers in dfs.
   > 
   > I think delaying marker deletion till cleaning is probably ok. but the reconcilation with data files i.e the deletion of extraeneous data files written due to spark stage retries must be handled pre-commit..
   > 
   > > if the markers file retain, if we should clean it when the datafile is cleaned, also if we should archive the markers file when archiveCommitsWith
   > 
   > there is no need to archive teh marker files in my opinion.. the contract in Hudi is that once an instant leaves the active timeline, its effects are permanent on the table ... so if a rollback needs to happen based on marker files, then it needs to be within the retained commits for active timeline.. I think this is a practical approach..
   > 
   > think of active timeline as the transaction log with pending actions/inflight/completed actions..
   
   thanks, agree with you . I will update the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-659584280


   thanks @lw309637554  actually was working on fixing it myself :). I am working on the marker dir deletion now. Will push to this branch later today. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 edited a comment on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 edited a comment on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-650096565


   > Took a quick pass at the three test classes you have added.. LGTM .
   > Will do a detailed pass once you confirm PR is indeed ready..
   
   @vinothchandar hello,i  have add more test for  end to end rollback. And the all hudi unit test passed.
   There are a few questions to discuss with you:
   1.  for rollback successful commit, in HoodieWriteClient.java i remove the deleteMarkerDir() in postcommit when is in usingmarkers mode.   But it will double the file numbers in dfs. 
   2. if the markers file retain, if we should clean it when the datafile is cleaned, also if we should archive the markers file when archiveCommitsWith
   
   if  these two question clear and solved, i think the patch will be ok .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-653270400


   > I will do a deep review of this over the weekend.. and see if I fix few things and merge.. thanks @lw309637554 !
   
   Thanks, i add the markerfiles clean in pre-commit for spark stage retries. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r455474421



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -182,12 +162,12 @@ private HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRol
    * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits.
    */
   private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-      Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
+                                                      String commit, String partitionPath) throws IOException {
+    final Map<FileStatus, Boolean> results = new HashMap<>();
     LOG.info("Cleaning path " + partitionPath);
     FileSystem fs = metaClient.getFs();
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
     PathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
+      if (path.toString().contains(HoodieFileFormat.PARQUET.getFileExtension())) {

Review comment:
       Get it. it is because master merge  do not handle well. Let me fix it 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-650728664


   >for rollback successful commit, in HoodieWriteClient.java i remove the deleteMarkerDir() in postcommit when is in usingmarkers mode. But it will double the file numbers in dfs.
   
   I think delaying marker deletion till cleaning is probably ok. but the reconcilation with data files i.e the deletion of extraeneous data files written due to spark stage retries must be handled pre-commit.. 
   
   >if the markers file retain, if we should clean it when the datafile is cleaned, also if we should archive the markers file when archiveCommitsWith
   
   there is no need to archive teh marker files in my opinion.. the contract in Hudi is that once an instant leaves the active timeline, its effects are permanent on the table ... so if a rollback needs to happen based on marker files, then it needs to be within the retained commits for active timeline.. I think this is a practical approach.. 
   
   think of active timeline as the transaction log with pending actions/inflight/completed actions..


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-659747189


   > thanks @lw309637554 actually was working on fixing it myself :). I am working on the marker dir deletion now. Will push to this branch later today.
   
   
   
   > thanks @lw309637554 actually was working on fixing it myself :). I am working on the marker dir deletion now. Will push to this branch later today.
   
   okay


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r449947875



##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -97,28 +98,9 @@ public Path makeNewPath(String partitionPath) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath) {
-    Path markerPath = makeNewMarkerPath(partitionPath);
-    try {
-      LOG.info("Creating Marker Path=" + markerPath);
-      fs.create(markerPath, false).close();
-    } catch (IOException e) {
-      throw new HoodieException("Failed to create marker file " + markerPath, e);
-    }
-  }
-
-  /**
-   * THe marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.
-   */
-  private Path makeNewMarkerPath(String partitionPath) {
-    Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(instantTime));
-    Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
-    try {
-      fs.mkdirs(path); // create a new partition as needed.
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to make dir " + path, e);
-    }
-    return new Path(path.toString(), FSUtils.makeMarkerFile(instantTime, writeToken, fileId));
+  protected void createMarkerFile(String partitionPath, String dataFileName) {
+    MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
+    markerFiles.createMarkerFile(partitionPath, dataFileName, getIOType());

Review comment:
       hi, i think to be createMarkerFile will more clearness




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar merged pull request #1756: [HUDI-839] Introducing support for rollbacks using marker files

Posted by GitBox <gi...@apache.org>.
vinothchandar merged pull request #1756:
URL: https://github.com/apache/hudi/pull/1756


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1756: [HUDI-839] Introducing support for rollbacks using marker files

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r457651663



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOType;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Operates on marker files for a given write action (commit, delta commit, compaction).
+ */
+public class MarkerFiles {
+
+  private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);
+
+  public static String stripMarkerSuffix(String path) {
+    return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
+  }
+
+  private final String instantTime;
+  private final FileSystem fs;
+  private final Path markerDirPath;
+  private final String basePath;
+
+  public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) {
+    this.instantTime = instantTime;
+    this.fs = fs;
+    this.markerDirPath = new Path(markerFolderPath);
+    this.basePath = basePath;
+  }
+
+  public MarkerFiles(HoodieTable<?> table, String instantTime) {
+    this(table.getMetaClient().getFs(),
+        table.getMetaClient().getBasePath(),
+        table.getMetaClient().getMarkerFolderPath(instantTime),
+        instantTime);
+  }
+
+  public void quietDeleteMarkerDir() {
+    try {
+      deleteMarkerDir();
+    } catch (HoodieIOException ioe) {
+      LOG.warn("Error deleting marker directory for instant " + instantTime, ioe);
+    }
+  }
+
+  /**
+   * Delete Marker directory corresponding to an instant.
+   */
+  public boolean deleteMarkerDir() {
+    try {
+      boolean result = fs.delete(markerDirPath, true);
+      if (result) {
+        LOG.info("Removing marker directory at " + markerDirPath);
+      } else {
+        LOG.info("No marker directory to delete at " + markerDirPath);
+      }
+      return result;
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  public boolean doesMarkerDirExist() throws IOException {
+    return fs.exists(markerDirPath);
+  }
+
+  public List<String> createdAndMergedDataPaths() throws IOException {
+    List<String> dataFiles = new LinkedList<>();
+    FSUtils.processFiles(fs, markerDirPath.toString(), (status) -> {
+      String pathStr = status.getPath().toString();
+      if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {

Review comment:
       yes. we will be performing an upgrade anyway to 0.6.0., which will list the inflight instant at the time of upgrade and then subsequently, write compatible, corresponding marker files 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1756: [HUDI-839] Introducing support for rollbacks using marker files

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-661633207


   @lw309637554 Looks good. Planning to merge after CI passes this time.. Thanks a lot of for your contributions. This is one very important PR !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-648541535


   > Took a quick pass at the three test classes you have added.. LGTM .
   > Will do a detailed pass once you confirm PR is indeed ready..
   
   Thanks,i will fix the failed unit test 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r449946532



##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -278,6 +286,11 @@ public WriteStatus getWriteStatus() {
     return writeStatus;
   }
 
+  @Override
+  public MarkerFiles.MarkerType getIOType() {

Review comment:
        to IOType will be better




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-650096565


   > Took a quick pass at the three test classes you have added.. LGTM .
   > Will do a detailed pass once you confirm PR is indeed ready..
   
   @vinothchandar hello,i  have add more test for  end to end rollback. And the all hudi unit test passed.
   There are a few questions to discuss with you:
   1.  for rollback successful commit, in HoodieWriteClient.java i remove the deleteMarkerDir() in postcommit. But it will double the file numbers in dfs. 
   2. if the markers file retain, if we should clean it when the datafile is cleaned, also if we should archive the markers file when archiveCommitsWith
   
   if  these two question clear and solved, i think the patch will be ok .Then i will add a new patch that will not conflict with the master. Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r444577094



##########
File path: hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientTestBase {
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    //just generate tow partitions
+    dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
+    initFileSystem();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  @Test
+  public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile() throws IOException {
+    // Let's create some commit files and parquet files
+    String commitTime1 = "001";
+    String commitTime2 = "002";
+    new File(basePath + "/.hoodie").mkdirs();
+    HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{"2015/03/16", "2015/03/17", "2016/03/15"},
+        basePath);
+    HoodieTestUtils.createCommitFiles(basePath, commitTime1, commitTime2);
+
+    // Make commit1
+    String file11 = HoodieTestUtils.createDataFile(basePath, "2015/03/16", commitTime1, "id11");
+    HoodieTestUtils.createNewLogFile(fs, basePath, "2015/03/16",
+        commitTime1, "id11", Option.of(3));
+    String file12 = HoodieTestUtils.createDataFile(basePath, "2015/03/17", commitTime1, "id12");
+
+    // Make commit2
+    String file21 = HoodieTestUtils.createDataFile(basePath, "2015/03/16", commitTime2, "id21");
+    String file22 = HoodieTestUtils.createDataFile(basePath, "2015/03/17", commitTime2, "id22");
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
+    HoodieTable table = this.getHoodieTable(metaClient, config);
+    HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002");
+
+    // execute CopyOnWriteRollbackActionExecutor with filelisting mode
+    CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(jsc, config, table, "003", needRollBackInstant, true);
+    assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
+    List<HoodieRollbackStat> hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback();
+
+    // assert hoodieRollbackStats
+    assertEquals(hoodieRollbackStats.size(), 3);
+    hoodieRollbackStats.forEach(stat -> {
+      if (stat.getPartitionPath().equals("2015/03/16")) {
+        assertEquals(1, stat.getSuccessDeleteFiles().size());
+        assertEquals(0, stat.getFailedDeleteFiles().size());
+        assertEquals(null, stat.getCommandBlocksCount());
+        assertEquals("file:" + HoodieTestUtils.getDataFilePath(basePath, "2015/03/16", commitTime2, file21),
+            stat.getSuccessDeleteFiles().get(0));
+      } else if (stat.getPartitionPath().equals("2015/03/17")) {
+        assertEquals(1, stat.getSuccessDeleteFiles().size());
+        assertEquals(0, stat.getFailedDeleteFiles().size());
+        assertEquals(null, stat.getCommandBlocksCount());
+        assertEquals("file:" + HoodieTestUtils.getDataFilePath(basePath, "2015/03/17", commitTime2, file22),
+            stat.getSuccessDeleteFiles().get(0));
+      } else if (stat.getPartitionPath().equals("2015/03/17")) {
+        assertEquals(0, stat.getSuccessDeleteFiles().size());
+        assertEquals(0, stat.getFailedDeleteFiles().size());
+        assertEquals(null, stat.getCommandBlocksCount());
+      }
+    });
+
+    assertTrue(HoodieTestUtils.doesCommitExist(basePath, "001"));
+    assertTrue(HoodieTestUtils.doesInflightExist(basePath, "001"));
+    assertFalse(HoodieTestUtils.doesCommitExist(basePath, "002"));
+    assertFalse(HoodieTestUtils.doesInflightExist(basePath, "002"));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, "2015/03/16", commitTime1, file11)
+        && HoodieTestUtils.doesDataFileExist(basePath, "2015/03/17", commitTime1, file12));
+    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, "2015/03/16", commitTime2, file21)
+        || HoodieTestUtils.doesDataFileExist(basePath, "2015/03/17", commitTime2, file22));
+  }
+
+  private void twoUpsertCommitDataRollBack(boolean isUsingMarkers) throws IOException {
+    //1. prepare data
+    HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).build();

Review comment:
       cc @xushiyan I feel just making these two commits or this template code in some TestHelper.. will clean up code by a lot.. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r449947358



##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -113,8 +109,9 @@ private void init(String fileId, String partitionPath, HoodieBaseFile dataFileTo
       partitionMetadata.trySave(getPartitionId());
 
       oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
+      String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId);

Review comment:
       yes, it will be more common use. HUDI will support more base format




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r449948082



##########
File path: hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -445,10 +442,20 @@ public void testCOWToMORConvertedTableRollback(HoodieFileFormat baseFileFormat)
 
   @ParameterizedTest
   @MethodSource("argumentsProvider")
-  public void testRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
+  public void testCOWToMORConvertedTableRollbackUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1756: [HUDI-839] Introducing support for rollbacks using marker files

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-661690560


   > @lw309637554 Looks good. Planning to merge after CI passes this time.. Thanks a lot of for your contributions. This is one very important PR !
   
   
   Great, a very good cooperation experience . I can start other work


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r449948557



##########
File path: hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientTestBase {
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    //just generate tow partitions

Review comment:
       ok

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -97,28 +98,9 @@ public Path makeNewPath(String partitionPath) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath) {
-    Path markerPath = makeNewMarkerPath(partitionPath);
-    try {
-      LOG.info("Creating Marker Path=" + markerPath);
-      fs.create(markerPath, false).close();
-    } catch (IOException e) {
-      throw new HoodieException("Failed to create marker file " + markerPath, e);
-    }
-  }
-
-  /**
-   * THe marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.
-   */
-  private Path makeNewMarkerPath(String partitionPath) {
-    Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(instantTime));
-    Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
-    try {
-      fs.mkdirs(path); // create a new partition as needed.
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to make dir " + path, e);
-    }
-    return new Path(path.toString(), FSUtils.makeMarkerFile(instantTime, writeToken, fileId));
+  protected void createMarkerFile(String partitionPath, String dataFileName) {
+    MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
+    markerFiles.createMarkerFile(partitionPath, dataFileName, getIOType());

Review comment:
       > nit: we can just name this `create()`
   
   yes ,i will be better

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
##########
@@ -408,6 +416,16 @@ public void testCopyOnWriteTable() throws Exception {
     checkReadRecords("000", 2 * numRecords);
   }
 
+  @Test
+  public void testCopyOnWriteTableUsingFileListRollBack() throws Exception {
+    testCopyOnWriteTable(false);
+  }
+
+  @Test
+  public void testCopyOnWriteTableUsingMarkersRollBack() throws Exception {

Review comment:
       yes ,i will remove it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r449947972



##########
File path: hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
##########
@@ -904,6 +901,19 @@ public void testCleanMarkerDataFilesOnRollback() throws IOException {
     assertEquals(0, getTotalTempFiles(), "All temp files are deleted.");
   }
 
+  /**
+   * Test Cleaning functionality of table.rollback() API.
+   */
+  @Test
+  public void testCleanMarkerDataFilesOnRollbackUsingFileList() throws IOException {

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r449947875



##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -97,28 +98,9 @@ public Path makeNewPath(String partitionPath) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath) {
-    Path markerPath = makeNewMarkerPath(partitionPath);
-    try {
-      LOG.info("Creating Marker Path=" + markerPath);
-      fs.create(markerPath, false).close();
-    } catch (IOException e) {
-      throw new HoodieException("Failed to create marker file " + markerPath, e);
-    }
-  }
-
-  /**
-   * THe marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.
-   */
-  private Path makeNewMarkerPath(String partitionPath) {
-    Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(instantTime));
-    Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
-    try {
-      fs.mkdirs(path); // create a new partition as needed.
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to make dir " + path, e);
-    }
-    return new Path(path.toString(), FSUtils.makeMarkerFile(instantTime, writeToken, fileId));
+  protected void createMarkerFile(String partitionPath, String dataFileName) {
+    MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
+    markerFiles.createMarkerFile(partitionPath, dataFileName, getIOType());

Review comment:
       hi, i think to be createMarkerFile will more clearness




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 edited a comment on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 edited a comment on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-650096565


   > Took a quick pass at the three test classes you have added.. LGTM .
   > Will do a detailed pass once you confirm PR is indeed ready..
   
   @vinothchandar hello,i  have add more test for  end to end rollback. And the all hudi unit test passed.
   There are a few questions to discuss with you:
   1.  for rollback successful commit, in HoodieWriteClient.java i remove the deleteMarkerDir() in postcommit when is in usingmarkers mode.   But it will double the file numbers in dfs. 
   2. if the markers file retain, if we should clean it when the datafile is cleaned, also if we should archive the markers file when archiveCommitsWith
   
   if  these two question clear and solved, i think the patch will be ok .Then i will add a new patch that will not conflict with the master. Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-648490993


   @lw309637554 is this ready for review?  seems like we have unit test failures?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r455279709



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -182,12 +162,12 @@ private HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRol
    * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits.
    */
   private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-      Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
+                                                      String commit, String partitionPath) throws IOException {
+    final Map<FileStatus, Boolean> results = new HashMap<>();
     LOG.info("Cleaning path " + partitionPath);
     FileSystem fs = metaClient.getFs();
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
     PathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
+      if (path.toString().contains(HoodieFileFormat.PARQUET.getFileExtension())) {

Review comment:
       @lw309637554 any reason for changing this to PARQUET? (or was that me?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r449786823



##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -113,8 +109,9 @@ private void init(String fileId, String partitionPath, HoodieBaseFile dataFileTo
       partitionMetadata.trySave(getPartitionId());
 
       oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
+      String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId);

Review comment:
       probably need to ensure we are getting the base file format extension from the hoodieTable instance? 

##########
File path: hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieRollbackStat;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestMergeOnReadRollbackActionExecutor extends HoodieClientTestBase {
+  @Override
+  protected HoodieTableType getTableType() {
+    return HoodieTableType.MERGE_ON_READ;
+  }
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    //just generate tow partitions
+    dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
+    initFileSystem();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private void twoUpsertCommitDataRollBack(boolean isUsingMarkers) throws IOException, InterruptedException {

Review comment:
       anyway to share code with the COW test?

##########
File path: hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
##########
@@ -328,6 +330,18 @@ public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType) thro
     assert (javaRDD.filter(record -> record.getCurrentLocation() != null).collect().size() == 0);
   }
 
+  @ParameterizedTest
+  @EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE"})

Review comment:
       Similar here.. this test probably does not need to test these two modes? for e.g: what additional testing are we getting over the test schema evolution test by doing this?

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Performs rollback using marker files generated during the write..
+ */
+public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.RollbackStrategy {
+
+  private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackStrategy.class);
+
+  private final HoodieTable<?> table;
+
+  private final transient JavaSparkContext jsc;
+
+  private final HoodieWriteConfig config;
+
+  private final String basePath;
+
+  private final String instantTime;
+
+  public MarkerBasedRollbackStrategy(HoodieTable<?> table, JavaSparkContext jsc, HoodieWriteConfig config, String instantTime) {
+    this.table = table;
+    this.jsc = jsc;
+    this.basePath = table.getMetaClient().getBasePath();
+    this.config = config;
+    this.instantTime = instantTime;
+  }
+
+  private HoodieRollbackStat undoMerge(String mergedBaseFilePath) throws IOException {
+    LOG.info("Rolling back by deleting the merged base file:" + mergedBaseFilePath);
+    return deleteBaseFile(mergedBaseFilePath);
+  }
+
+  private HoodieRollbackStat undoCreate(String createdBaseFilePath) throws IOException {
+    LOG.info("Rolling back by deleting the created base file:" + createdBaseFilePath);
+

Review comment:
       nit:extra line

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -54,29 +53,28 @@
 /**
  * Performs Rollback of Hoodie Tables.
  */
-public class RollbackHelper implements Serializable {
+public class ListingBasedRollbackHelper implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(RollbackHelper.class);
+  private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
 
   private final HoodieTableMetaClient metaClient;
   private final HoodieWriteConfig config;
 
-  public RollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+  public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
     this.metaClient = metaClient;
     this.config = config;
   }
 
   /**
    * Performs all rollback actions that we have collected in parallel.
    */
-  public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) {
+  public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
 
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
     SerializablePathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
+      if (path.toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {

Review comment:
       these could be replaced with base format from the table object?

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -278,6 +286,11 @@ public WriteStatus getWriteStatus() {
     return writeStatus;
   }
 
+  @Override
+  public MarkerFiles.MarkerType getIOType() {

Review comment:
       Reflecting on this, probably good to rename `MarkerType` to `IOType`.. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -332,9 +333,11 @@ public static SparkConf registerClasses(SparkConf conf) {
   }
 
   @Override
-  protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
-      Option<Map<String, String>> extraMetadata) {
+  protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
     try {
+      if (!config.getRollBackUsingMarkers()) {

Review comment:
       I am wondering if we should do this always.. having this logic be rollback dependent can become hard to reason with in the long run

##########
File path: hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientTestBase {
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    //just generate tow partitions

Review comment:
       typo:two

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -97,28 +98,9 @@ public Path makeNewPath(String partitionPath) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath) {
-    Path markerPath = makeNewMarkerPath(partitionPath);
-    try {
-      LOG.info("Creating Marker Path=" + markerPath);
-      fs.create(markerPath, false).close();
-    } catch (IOException e) {
-      throw new HoodieException("Failed to create marker file " + markerPath, e);
-    }
-  }
-
-  /**
-   * THe marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.
-   */
-  private Path makeNewMarkerPath(String partitionPath) {
-    Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(instantTime));
-    Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
-    try {
-      fs.mkdirs(path); // create a new partition as needed.
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to make dir " + path, e);
-    }
-    return new Path(path.toString(), FSUtils.makeMarkerFile(instantTime, writeToken, fileId));
+  protected void createMarkerFile(String partitionPath, String dataFileName) {
+    MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
+    markerFiles.createMarkerFile(partitionPath, dataFileName, getIOType());

Review comment:
       nit: we can just name this `create()` 

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
##########
@@ -408,6 +416,16 @@ public void testCopyOnWriteTable() throws Exception {
     checkReadRecords("000", 2 * numRecords);
   }
 
+  @Test
+  public void testCopyOnWriteTableUsingFileListRollBack() throws Exception {
+    testCopyOnWriteTable(false);
+  }
+
+  @Test
+  public void testCopyOnWriteTableUsingMarkersRollBack() throws Exception {

Review comment:
       these combinations added on Schema evolution test is a bit hard to understand. like why would be testing modes for rollbacks in a schema evolution test? any particular reason?

##########
File path: hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -445,10 +442,20 @@ public void testCOWToMORConvertedTableRollback(HoodieFileFormat baseFileFormat)
 
   @ParameterizedTest
   @MethodSource("argumentsProvider")
-  public void testRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
+  public void testCOWToMORConvertedTableRollbackUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {

Review comment:
       these may be worth adding in both modes.. 

##########
File path: hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
##########
@@ -904,6 +901,19 @@ public void testCleanMarkerDataFilesOnRollback() throws IOException {
     assertEquals(0, getTotalTempFiles(), "All temp files are deleted.");
   }
 
+  /**
+   * Test Cleaning functionality of table.rollback() API.
+   */
+  @Test
+  public void testCleanMarkerDataFilesOnRollbackUsingFileList() throws IOException {

Review comment:
       same comment here.. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-654941314


   @vinothchandar I have fix all the issue as you comment. More comprehensive solution about delete markerfile will depend on you


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-653975959


   > Thanks for adding the tests.. left some comments..
   > 
   > I am trying to cross check the deletion of the marker dir once more (the issue you mentioned before in prev iteration).. I will push an update to the branch.. and it should be good to go.
   
   thanks , i will fix the issue as the comment. More comprehensive solution about delete markerfile will depend on you 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-655173307


   Thanks! on it! 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #1756: [HUDI-839] Introducing support for rollbacks using marker files

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r457007064



##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -632,6 +638,10 @@ public FileSystemViewStorageConfig getClientSpecifiedViewStorageConfig() {
     return clientSpecifiedViewStorageConfig;
   }
 
+  public boolean getRollBackUsingMarkers() {

Review comment:
       Is this needed since we already.have shouldRollbackUsingMarkers() ?

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -410,72 +412,54 @@ public void deleteMarkerDir(String instantTs) {
    * @param consistencyCheckEnabled Consistency Check Enabled
    * @throws HoodieIOException
    */
-  protected void cleanFailedWrites(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats,
-      boolean consistencyCheckEnabled) throws HoodieIOException {
+  protected void reconcileAgainstMarkers(JavaSparkContext jsc,
+                                         String instantTs,
+                                         List<HoodieWriteStat> stats,
+                                         boolean consistencyCheckEnabled) throws HoodieIOException {
     try {
       // Reconcile marker and data files with WriteStats so that partially written data-files due to failed
       // (but succeeded on retry) tasks are removed.
       String basePath = getMetaClient().getBasePath();
-      FileSystem fs = getMetaClient().getFs();
-      Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
+      MarkerFiles markers = new MarkerFiles(this, instantTs);
 
-      if (!fs.exists(markerDir)) {
-        // Happens when all writes are appends
+      if (!markers.doesMarkerDirExist()) {
+        // can happen if it was an empty write say.
         return;
       }
 
-      final String baseFileExtension = getBaseFileFormat().getFileExtension();
-      List<String> invalidDataPaths = FSUtils.getAllDataFilesForMarkers(fs, basePath, instantTs, markerDir.toString(),
-          baseFileExtension);
-      List<String> validDataPaths = stats.stream().map(w -> String.format("%s/%s", basePath, w.getPath()))
-          .filter(p -> p.endsWith(baseFileExtension)).collect(Collectors.toList());
+      // we are not including log appends here, since they are already fail-safe.

Review comment:
       This looks good to me. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOType;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Operates on marker files for a given write action (commit, delta commit, compaction).
+ */
+public class MarkerFiles {
+
+  private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);
+
+  public static String stripMarkerSuffix(String path) {
+    return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
+  }
+
+  private final String instantTime;
+  private final FileSystem fs;
+  private final Path markerDirPath;
+  private final String basePath;
+
+  public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) {
+    this.instantTime = instantTime;
+    this.fs = fs;
+    this.markerDirPath = new Path(markerFolderPath);
+    this.basePath = basePath;
+  }
+
+  public MarkerFiles(HoodieTable<?> table, String instantTime) {
+    this(table.getMetaClient().getFs(),
+        table.getMetaClient().getBasePath(),
+        table.getMetaClient().getMarkerFolderPath(instantTime),
+        instantTime);
+  }
+
+  public void quietDeleteMarkerDir() {
+    try {
+      deleteMarkerDir();
+    } catch (HoodieIOException ioe) {
+      LOG.warn("Error deleting marker directory for instant " + instantTime, ioe);
+    }
+  }
+
+  /**
+   * Delete Marker directory corresponding to an instant.
+   */
+  public boolean deleteMarkerDir() {
+    try {
+      boolean result = fs.delete(markerDirPath, true);
+      if (result) {
+        LOG.info("Removing marker directory at " + markerDirPath);
+      } else {
+        LOG.info("No marker directory to delete at " + markerDirPath);
+      }
+      return result;
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  public boolean doesMarkerDirExist() throws IOException {
+    return fs.exists(markerDirPath);
+  }
+
+  public List<String> createdAndMergedDataPaths() throws IOException {
+    List<String> dataFiles = new LinkedList<>();
+    FSUtils.processFiles(fs, markerDirPath.toString(), (status) -> {
+      String pathStr = status.getPath().toString();
+      if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {

Review comment:
       This is fine (backwards compatible) as we are not currently writing marker files for appends. right ?

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -264,6 +275,7 @@ public void archive(List<HoodieInstant> instants) throws HoodieCommitException {
       List<IndexedRecord> records = new ArrayList<>();
       for (HoodieInstant hoodieInstant : instants) {
         try {
+          deleteAnyLeftOverMarkerFiles(hoodieInstant);

Review comment:
       Ack

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -332,9 +333,12 @@ public static SparkConf registerClasses(SparkConf conf) {
   }
 
   @Override
-  protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
-      Option<Map<String, String>> extraMetadata) {
+  protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
     try {
+
+      // Delete the marker directory for the instant.

Review comment:
       Ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r449947922



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -54,29 +53,28 @@
 /**
  * Performs Rollback of Hoodie Tables.
  */
-public class RollbackHelper implements Serializable {
+public class ListingBasedRollbackHelper implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(RollbackHelper.class);
+  private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
 
   private final HoodieTableMetaClient metaClient;
   private final HoodieWriteConfig config;
 
-  public RollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+  public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
     this.metaClient = metaClient;
     this.config = config;
   }
 
   /**
    * Performs all rollback actions that we have collected in parallel.
    */
-  public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) {
+  public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
 
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
     SerializablePathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
+      if (path.toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {

Review comment:
       to be format from the table object will better

##########
File path: hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
##########
@@ -328,6 +330,18 @@ public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType) thro
     assert (javaRDD.filter(record -> record.getCurrentLocation() != null).collect().size() == 0);
   }
 
+  @ParameterizedTest
+  @EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE"})

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r449946413



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -332,9 +333,11 @@ public static SparkConf registerClasses(SparkConf conf) {
   }
 
   @Override
-  protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
-      Option<Map<String, String>> extraMetadata) {
+  protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
     try {
+      if (!config.getRollBackUsingMarkers()) {

Review comment:
       i  also think  it not so good. i think can do not delete markerfiles here.Can delete the unuseful   markerfile in pre-commit. when clean delete the old markerfiles




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1756: [HUDI-839] Introducing support for rollbacks using marker files

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#issuecomment-660780134


   test is failing on CI (linux?) while it passes locally.. Looking. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1756: [HUDI-839] Adding unit test for MarkerFiles,RollbackUtils, RollbackActionExecutor for markers and filelisting

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1756:
URL: https://github.com/apache/hudi/pull/1756#discussion_r455475745



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -182,12 +162,12 @@ private HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRol
    * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits.
    */
   private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-      Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
+                                                      String commit, String partitionPath) throws IOException {
+    final Map<FileStatus, Boolean> results = new HashMap<>();
     LOG.info("Cleaning path " + partitionPath);
     FileSystem fs = metaClient.getFs();
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
     PathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
+      if (path.toString().contains(HoodieFileFormat.PARQUET.getFileExtension())) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org