You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/04/23 02:23:40 UTC

[GitHub] [incubator-hudi] hddong opened a new pull request #1554: [HUDI-704]Add test for RepairsCommand

hddong opened a new pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   *Add test for RepairsCommand in hudi-cli module*
   
   ## Brief change log
   
     - *Add test for RepairsCommand*
   
   ## Verify this pull request
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] codecov-io commented on pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#issuecomment-625178468


   # [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1554?src=pr&el=h1) Report
   > Merging [#1554](https://codecov.io/gh/apache/incubator-hudi/pull/1554?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/506447fd4fde4cd922f7aa8f4e17a7f06666dc97&el=desc) will **increase** coverage by `0.06%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1554/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1554?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1554      +/-   ##
   ============================================
   + Coverage     71.82%   71.88%   +0.06%     
   - Complexity      294     1087     +793     
   ============================================
     Files           385      385              
     Lines         16549    16553       +4     
     Branches       1661     1663       +2     
   ============================================
   + Hits          11886    11899      +13     
   + Misses         3931     3928       -3     
   + Partials        732      726       -6     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1554?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...oop/realtime/HoodieParquetRealtimeInputFormat.java](https://codecov.io/gh/apache/incubator-hudi/pull/1554/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL3JlYWx0aW1lL0hvb2RpZVBhcnF1ZXRSZWFsdGltZUlucHV0Rm9ybWF0LmphdmE=) | `72.34% <0.00%> (ø)` | `0.00% <0.00%> (ø%)` | |
   | [...n/scala/org/apache/hudi/HoodieSparkSqlWriter.scala](https://codecov.io/gh/apache/incubator-hudi/pull/1554/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2h1ZGkvSG9vZGllU3BhcmtTcWxXcml0ZXIuc2NhbGE=) | `53.33% <0.00%> (+0.53%)` | `0.00% <0.00%> (ø%)` | |
   | [.../org/apache/hudi/index/bloom/HoodieBloomIndex.java](https://codecov.io/gh/apache/incubator-hudi/pull/1554/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaW5kZXgvYmxvb20vSG9vZGllQmxvb21JbmRleC5qYXZh) | `97.36% <0.00%> (+0.87%)` | `18.00% <0.00%> (+18.00%)` | |
   | [...ava/org/apache/hudi/common/model/HoodieRecord.java](https://codecov.io/gh/apache/incubator-hudi/pull/1554/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZVJlY29yZC5qYXZh) | `82.75% <0.00%> (+1.72%)` | `13.00% <0.00%> (+13.00%)` | |
   | [.../org/apache/hudi/execution/LazyInsertIterable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1554/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL0xhenlJbnNlcnRJdGVyYWJsZS5qYXZh) | `84.31% <0.00%> (+3.92%)` | `8.00% <0.00%> (+8.00%)` | |
   | [...i/index/bloom/BucketizedBloomCheckPartitioner.java](https://codecov.io/gh/apache/incubator-hudi/pull/1554/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaW5kZXgvYmxvb20vQnVja2V0aXplZEJsb29tQ2hlY2tQYXJ0aXRpb25lci5qYXZh) | `97.87% <0.00%> (+4.25%)` | `3.00% <0.00%> (+3.00%)` | |
   | [...n/java/org/apache/hudi/common/model/HoodieKey.java](https://codecov.io/gh/apache/incubator-hudi/pull/1554/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZUtleS5qYXZh) | `94.44% <0.00%> (+5.55%)` | `4.00% <0.00%> (+4.00%)` | |
   | [...ache/hudi/common/fs/inline/InMemoryFileSystem.java](https://codecov.io/gh/apache/incubator-hudi/pull/1554/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9Jbk1lbW9yeUZpbGVTeXN0ZW0uamF2YQ==) | `89.65% <0.00%> (+10.34%)` | `0.00% <0.00%> (ø%)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1554?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1554?src=pr&el=footer). Last update [506447f...95b9e09](https://codecov.io/gh/apache/incubator-hudi/pull/1554?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#discussion_r420784920



##########
File path: hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
##########
@@ -64,19 +69,35 @@ public String deduplicate(
       @CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files",
           mandatory = true) final String repairedOutputPath,
       @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path",
-          mandatory = true) final String sparkPropertiesPath)
+          unspecifiedDefaultValue = "") String sparkPropertiesPath,
+      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master,

Review comment:
       `"Spark Master "` -> `"Spark Master"`?

##########
File path: hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
##########
@@ -64,19 +69,35 @@ public String deduplicate(
       @CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files",
           mandatory = true) final String repairedOutputPath,
       @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path",
-          mandatory = true) final String sparkPropertiesPath)
+          unspecifiedDefaultValue = "") String sparkPropertiesPath,
+      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master,
+      @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
+          help = "Spark executor memory") final String sparkMemory,
+      @CliOption(key = {"dryrun"},
+          help = "Should we actually remove duplicates or just run and store result to repairedOutputPath",
+          unspecifiedDefaultValue = "true") final boolean dryRun)
       throws Exception {
+    if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) {
+      sparkPropertiesPath =
+          Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
+    }
+
     SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
-    sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), duplicatedPartitionPath, repairedOutputPath,
-        HoodieCLI.getTableMetaClient().getBasePath());
+    sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), master, sparkMemory,

Review comment:
       The same suggestion, we should try to define a data structure? We can refactor it later.

##########
File path: hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
##########
@@ -73,8 +73,8 @@ public static void main(String[] args) throws Exception {
         returnCode = rollback(jsc, args[1], args[2]);
         break;
       case DEDUPLICATE:
-        assert (args.length == 4);
-        returnCode = deduplicatePartitionPath(jsc, args[1], args[2], args[3]);
+        assert (args.length == 7);
+        returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], args[6]);

Review comment:
       IMHO, we also need to refactor the arg parse. But not in this PR.

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ * <p/>
+ * A command use SparkLauncher need load jars under lib which generate during mvn package.
+ * Use integration test instead of unit test.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+
+  private String duplicatedPartitionPath;
+  private String repairedOutputPath;
+
+  @BeforeEach
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    Files.createFile(Paths.get(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken)));
+    Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + ".commit"));
+
+    // read records and get 10 to generate duplicates
+    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
+
+    String fileName3 = "3_0_20160401010202.parquet";
+    commitTime = FSUtils.getCommitTime(fileName3);
+    df.limit(10).withColumn("_hoodie_commit_time", lit(commitTime))
+        .write().parquet(duplicatedPartitionPath + File.separator + fileName3);
+    Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + ".commit"));
+
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+  }
+
+  /**
+   * Test case for dry run deduplicate.
+   */
+  @Test
+  public void testDeduplicate() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+    assertEquals(3, filteredStatuses.size(), "There should be 3 files.");
+
+    // Before deduplicate, all files contain 210 records
+    String[] files = filteredStatuses.toArray(new String[0]);
+    Dataset df = sqlContext.read().parquet(files);
+    assertEquals(210, df.count());
+
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String cmdStr = "repair deduplicate --duplicatedPartitionPath " + partitionPath
+        + " --repairedOutputPath " + repairedOutputPath + " --sparkMaster local";
+    CommandResult cr = getShell().executeCommand(cmdStr);
+    assertTrue(cr.isSuccess());
+    assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, cr.getResult().toString());
+
+    // After deduplicate, there are 200 records
+    FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath));
+    files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new);
+    Dataset result = sqlContext.read().parquet(files);
+    assertEquals(200, result.count());
+  }
+
+  /**
+   * Test case for real run deduplicate.
+   */
+  @Test
+  public void testDeduplicateWithReal() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+    assertEquals(3, filteredStatuses.size(), "There should be 3 files.");
+
+    // Before deduplicate, all files contain 210 records
+    String[] files = filteredStatuses.toArray(new String[0]);
+    Dataset df = sqlContext.read().parquet(files);
+    assertEquals(210, df.count());
+
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String cmdStr = "repair deduplicate --duplicatedPartitionPath " + partitionPath

Review comment:
       ditto

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ * <p/>
+ * A command use SparkLauncher need load jars under lib which generate during mvn package.
+ * Use integration test instead of unit test.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+
+  private String duplicatedPartitionPath;
+  private String repairedOutputPath;
+
+  @BeforeEach
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    Files.createFile(Paths.get(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken)));
+    Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + ".commit"));
+
+    // read records and get 10 to generate duplicates
+    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
+
+    String fileName3 = "3_0_20160401010202.parquet";
+    commitTime = FSUtils.getCommitTime(fileName3);
+    df.limit(10).withColumn("_hoodie_commit_time", lit(commitTime))
+        .write().parquet(duplicatedPartitionPath + File.separator + fileName3);
+    Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + ".commit"));
+
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+  }
+
+  /**
+   * Test case for dry run deduplicate.
+   */
+  @Test
+  public void testDeduplicate() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+    assertEquals(3, filteredStatuses.size(), "There should be 3 files.");
+
+    // Before deduplicate, all files contain 210 records
+    String[] files = filteredStatuses.toArray(new String[0]);
+    Dataset df = sqlContext.read().parquet(files);
+    assertEquals(210, df.count());
+
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String cmdStr = "repair deduplicate --duplicatedPartitionPath " + partitionPath

Review comment:
       Can we use `String.format(xxx)` here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] hddong commented on a change in pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
hddong commented on a change in pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#discussion_r416285607



##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {

Review comment:
       > Any specific reason for having a separate class for RepairsCommand#deduplicate method?
   
   `deduplicate` is integration test(unit tests do not meet the requirements), `SparkLuncher` need load jars under `lib` which generate during `mvn package`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma edited a comment on pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
pratyakshsharma edited a comment on pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#issuecomment-619529855


   @hddong You might want to have a look at https://github.com/apache/incubator-hudi/pull/1558 and add a test case for the upserts case as well :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#discussion_r415281721



##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+  String duplicatedPartitionPath;
+  String repairedOutputPath;
+
+  @Before
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    new File(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))
+        .createNewFile();
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    // read records and get 10 to generate duplicates
+    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
+
+    String fileName3 = "3_0_20160401010101.parquet";
+    df.limit(10).withColumn("_hoodie_commit_time", lit("20160401010202"))

Review comment:
       This seems a bit misleading. If the file has time of 20160401010101, how can the records have time of 20160401010202? Rather we should have the file as 3_0_20160401010202.parquet and generate one more .commit file in meta folder for this. 
   
   Please correct me if I am missing something. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#discussion_r418430527



##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link RepairsCommand}.
+ */
+public class TestRepairsCommand extends AbstractShellIntegrationTest {
+
+  private String tablePath;
+
+  @Before
+  public void init() throws IOException {
+    String tableName = "test_table";
+    tablePath = basePath + File.separator + tableName;
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+  }
+
+  /**
+   * Test case for dry run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithDryRun() throws IOException {
+    // create commit instant
+    new File(tablePath + "/.hoodie/100.commit").createNewFile();
+
+    // create partition path
+    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+    fs.mkdirs(new Path(partition1));
+    fs.mkdirs(new Path(partition2));
+    fs.mkdirs(new Path(partition3));
+
+    // default is dry run.
+    CommandResult cr = getShell().executeCommand("repair addpartitionmeta");
+    Assert.assertTrue(cr.isSuccess());
+
+    // expected all 'No'.
+    String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath)
+        .stream()
+        .map(partition -> new String[]{partition, "No", "None"})
+        .toArray(String[][]::new);
+    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+
+    Assert.assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for real run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithRealRun() throws IOException {
+    // create commit instant
+    new File(tablePath + "/.hoodie/100.commit").createNewFile();

Review comment:
       ditto

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link RepairsCommand}.
+ */
+public class TestRepairsCommand extends AbstractShellIntegrationTest {
+
+  private String tablePath;
+
+  @Before
+  public void init() throws IOException {
+    String tableName = "test_table";
+    tablePath = basePath + File.separator + tableName;
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+  }
+
+  /**
+   * Test case for dry run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithDryRun() throws IOException {
+    // create commit instant
+    new File(tablePath + "/.hoodie/100.commit").createNewFile();
+
+    // create partition path
+    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+    fs.mkdirs(new Path(partition1));
+    fs.mkdirs(new Path(partition2));
+    fs.mkdirs(new Path(partition3));
+
+    // default is dry run.
+    CommandResult cr = getShell().executeCommand("repair addpartitionmeta");
+    Assert.assertTrue(cr.isSuccess());
+
+    // expected all 'No'.
+    String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath)
+        .stream()
+        .map(partition -> new String[]{partition, "No", "None"})
+        .toArray(String[][]::new);
+    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+
+    Assert.assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for real run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithRealRun() throws IOException {
+    // create commit instant
+    new File(tablePath + "/.hoodie/100.commit").createNewFile();
+
+    // create partition path
+    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+    fs.mkdirs(new Path(partition1));
+    fs.mkdirs(new Path(partition2));
+    fs.mkdirs(new Path(partition3));

Review comment:
       Multiple methods have this code snippet. Can we extract it into `init` method? Or Judge exists before create?

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link RepairsCommand}.
+ */
+public class TestRepairsCommand extends AbstractShellIntegrationTest {
+
+  private String tablePath;
+
+  @Before
+  public void init() throws IOException {
+    String tableName = "test_table";
+    tablePath = basePath + File.separator + tableName;
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+  }
+
+  /**
+   * Test case for dry run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithDryRun() throws IOException {
+    // create commit instant
+    new File(tablePath + "/.hoodie/100.commit").createNewFile();

Review comment:
       Use `Files.createFile()`?

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link RepairsCommand}.
+ */
+public class TestRepairsCommand extends AbstractShellIntegrationTest {
+
+  private String tablePath;
+
+  @Before
+  public void init() throws IOException {
+    String tableName = "test_table";
+    tablePath = basePath + File.separator + tableName;
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+  }
+
+  /**
+   * Test case for dry run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithDryRun() throws IOException {
+    // create commit instant
+    new File(tablePath + "/.hoodie/100.commit").createNewFile();
+
+    // create partition path
+    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+    fs.mkdirs(new Path(partition1));
+    fs.mkdirs(new Path(partition2));
+    fs.mkdirs(new Path(partition3));
+
+    // default is dry run.
+    CommandResult cr = getShell().executeCommand("repair addpartitionmeta");
+    Assert.assertTrue(cr.isSuccess());
+
+    // expected all 'No'.
+    String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath)
+        .stream()
+        .map(partition -> new String[]{partition, "No", "None"})
+        .toArray(String[][]::new);
+    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+
+    Assert.assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for real run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithRealRun() throws IOException {
+    // create commit instant
+    new File(tablePath + "/.hoodie/100.commit").createNewFile();
+
+    // create partition path
+    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+    fs.mkdirs(new Path(partition1));
+    fs.mkdirs(new Path(partition2));
+    fs.mkdirs(new Path(partition3));
+
+    CommandResult cr = getShell().executeCommand("repair addpartitionmeta --dryrun false");
+    Assert.assertTrue(cr.isSuccess());
+
+    List<String> paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath);
+    // after dry run, the action will be 'Repaired'
+    String[][] rows = paths.stream()
+        .map(partition -> new String[]{partition, "No", "Repaired"})
+        .toArray(String[][]::new);
+    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+
+    Assert.assertEquals(expected, cr.getResult().toString());
+
+    cr = getShell().executeCommand("repair addpartitionmeta");
+
+    // after real run, Metadata is present now.
+    rows = paths.stream()
+        .map(partition -> new String[]{partition, "Yes", "None"})
+        .toArray(String[][]::new);
+    expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+    Assert.assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for 'repair overwrite-hoodie-props'.
+   */
+  @Test
+  public void testOverwriteHoodieProperties() throws IOException {
+    URL newProps = this.getClass().getClassLoader().getResource("table-config.properties");
+    assertNotNull("New property file must exist", newProps);
+
+    CommandResult cr = getShell().executeCommand("repair overwrite-hoodie-props --new-props-file " + newProps.getPath());
+    Assert.assertTrue(cr.isSuccess());
+
+    Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().getProps();
+
+    // after overwrite, the stored value in .hoodie is equals to which read from properties.
+    Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
+    Properties expectProps = new Properties();
+    expectProps.load(new FileInputStream(new File(newProps.getPath())));
+
+    Map<String, String> expected = expectProps.entrySet().stream()
+        .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
+    Assert.assertEquals(expected, result);
+
+    // check result
+    List<String> allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type",
+        "hoodie.archivelog.folder", "hoodie.timeline.layout.version");
+    String[][] rows = allPropsStr.stream().sorted().map(key -> new String[]{key,
+        oldProps.getOrDefault(key, null), result.getOrDefault(key, null)})
+        .toArray(String[][]::new);
+    String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
+        HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
+
+    Assert.assertEquals(expect, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for 'repair corrupted clean files'.
+   */
+  @Test
+  public void testRemoveCorruptedPendingCleanAction() throws IOException {
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    Configuration conf = HoodieCLI.conf;
+
+    metaClient = HoodieCLI.getTableMetaClient();
+
+    // Create four requested files
+    for (int i = 100; i < 104; i++) {
+      String timestamp = String.valueOf(i);
+      // Write corrupted requested Compaction
+      HoodieTestCommitMetadataGenerator.createCompactionRequestedFile(tablePath, timestamp, conf);
+    }
+
+    // reload metaclient
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    // first, there are four instants
+    assertEquals(4, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
+
+    CommandResult cr = getShell().executeCommand("repair corrupted clean files");
+    assertTrue(cr.isSuccess());
+
+    // reload metaclient

Review comment:
       `metaclient` -> `meta client`?

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {

Review comment:
       It would be better to add the description you mentioned into the Javadoc of this class?

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+  String duplicatedPartitionPath;
+  String repairedOutputPath;
+
+  @Before
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    new File(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))
+        .createNewFile();
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    // read records and get 10 to generate duplicates
+    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
+
+    String fileName3 = "3_0_20160401010202.parquet";
+    commitTime = FSUtils.getCommitTime(fileName3);
+    df.limit(10).withColumn("_hoodie_commit_time", lit(commitTime))
+        .write().parquet(duplicatedPartitionPath + File.separator + fileName3);
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+  }
+
+  /**
+   * Test case for dry run deduplicate.
+   */
+  @Test
+  public void testDeduplicate() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(f -> f.getPath()).collect(Collectors.toList());
+    assertEquals("There should be 3 files.", 3, filteredStatuses.size());
+
+    // Before deduplicate, all files contain 210 records
+    String[] files = filteredStatuses.toArray(new String[filteredStatuses.size()]);
+    Dataset df = sqlContext.read().parquet(files);
+    assertEquals(210, df.count());
+
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String cmdStr = "repair deduplicate --duplicatedPartitionPath " + partitionPath
+        + " --repairedOutputPath " + repairedOutputPath + " --sparkMaster local";
+    CommandResult cr = getShell().executeCommand(cmdStr);
+    assertTrue(cr.isSuccess());
+    assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, cr.getResult().toString());
+
+    // After deduplicate, there are 200 records
+    FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath));
+    files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new);
+    Dataset result = sqlContext.read().parquet(files);
+    assertEquals(200, result.count());
+  }
+
+  /**
+   * Test case for real run deduplicate.
+   */
+  @Test
+  public void testDeduplicateWithReal() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(f -> f.getPath()).collect(Collectors.toList());
+    assertEquals("There should be 3 files.", 3, filteredStatuses.size());
+
+    // Before deduplicate, all files contain 210 records
+    String[] files = filteredStatuses.toArray(new String[filteredStatuses.size()]);

Review comment:
       ditto

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+  String duplicatedPartitionPath;
+  String repairedOutputPath;

Review comment:
       add an empty line before these two lines and mark them `private`.

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+  String duplicatedPartitionPath;
+  String repairedOutputPath;
+
+  @Before
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    new File(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))
+        .createNewFile();
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    // read records and get 10 to generate duplicates
+    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
+
+    String fileName3 = "3_0_20160401010202.parquet";
+    commitTime = FSUtils.getCommitTime(fileName3);
+    df.limit(10).withColumn("_hoodie_commit_time", lit(commitTime))
+        .write().parquet(duplicatedPartitionPath + File.separator + fileName3);
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();

Review comment:
       ditto

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+  String duplicatedPartitionPath;
+  String repairedOutputPath;
+
+  @Before
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    new File(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))
+        .createNewFile();

Review comment:
       `Files.createFile`

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+  String duplicatedPartitionPath;
+  String repairedOutputPath;
+
+  @Before
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    new File(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))
+        .createNewFile();
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();

Review comment:
       ditto

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link RepairsCommand}.
+ */
+public class TestRepairsCommand extends AbstractShellIntegrationTest {
+
+  private String tablePath;
+
+  @Before
+  public void init() throws IOException {
+    String tableName = "test_table";
+    tablePath = basePath + File.separator + tableName;
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+  }
+
+  /**
+   * Test case for dry run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithDryRun() throws IOException {
+    // create commit instant
+    new File(tablePath + "/.hoodie/100.commit").createNewFile();
+
+    // create partition path
+    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+    fs.mkdirs(new Path(partition1));
+    fs.mkdirs(new Path(partition2));
+    fs.mkdirs(new Path(partition3));
+
+    // default is dry run.
+    CommandResult cr = getShell().executeCommand("repair addpartitionmeta");
+    Assert.assertTrue(cr.isSuccess());
+
+    // expected all 'No'.
+    String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath)
+        .stream()
+        .map(partition -> new String[]{partition, "No", "None"})
+        .toArray(String[][]::new);
+    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+
+    Assert.assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for real run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithRealRun() throws IOException {
+    // create commit instant
+    new File(tablePath + "/.hoodie/100.commit").createNewFile();
+
+    // create partition path
+    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+    fs.mkdirs(new Path(partition1));
+    fs.mkdirs(new Path(partition2));
+    fs.mkdirs(new Path(partition3));
+
+    CommandResult cr = getShell().executeCommand("repair addpartitionmeta --dryrun false");
+    Assert.assertTrue(cr.isSuccess());
+
+    List<String> paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath);
+    // after dry run, the action will be 'Repaired'
+    String[][] rows = paths.stream()
+        .map(partition -> new String[]{partition, "No", "Repaired"})
+        .toArray(String[][]::new);
+    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+
+    Assert.assertEquals(expected, cr.getResult().toString());
+
+    cr = getShell().executeCommand("repair addpartitionmeta");
+
+    // after real run, Metadata is present now.
+    rows = paths.stream()
+        .map(partition -> new String[]{partition, "Yes", "None"})
+        .toArray(String[][]::new);
+    expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+    Assert.assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for 'repair overwrite-hoodie-props'.
+   */
+  @Test
+  public void testOverwriteHoodieProperties() throws IOException {
+    URL newProps = this.getClass().getClassLoader().getResource("table-config.properties");
+    assertNotNull("New property file must exist", newProps);
+
+    CommandResult cr = getShell().executeCommand("repair overwrite-hoodie-props --new-props-file " + newProps.getPath());
+    Assert.assertTrue(cr.isSuccess());
+
+    Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().getProps();
+
+    // after overwrite, the stored value in .hoodie is equals to which read from properties.
+    Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
+    Properties expectProps = new Properties();
+    expectProps.load(new FileInputStream(new File(newProps.getPath())));
+
+    Map<String, String> expected = expectProps.entrySet().stream()
+        .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
+    Assert.assertEquals(expected, result);
+
+    // check result
+    List<String> allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type",
+        "hoodie.archivelog.folder", "hoodie.timeline.layout.version");
+    String[][] rows = allPropsStr.stream().sorted().map(key -> new String[]{key,
+        oldProps.getOrDefault(key, null), result.getOrDefault(key, null)})
+        .toArray(String[][]::new);
+    String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
+        HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
+
+    Assert.assertEquals(expect, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for 'repair corrupted clean files'.
+   */
+  @Test
+  public void testRemoveCorruptedPendingCleanAction() throws IOException {
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    Configuration conf = HoodieCLI.conf;
+
+    metaClient = HoodieCLI.getTableMetaClient();
+
+    // Create four requested files
+    for (int i = 100; i < 104; i++) {
+      String timestamp = String.valueOf(i);
+      // Write corrupted requested Compaction
+      HoodieTestCommitMetadataGenerator.createCompactionRequestedFile(tablePath, timestamp, conf);
+    }
+
+    // reload metaclient

Review comment:
       `metaclient` -> `meta client`?

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+  String duplicatedPartitionPath;
+  String repairedOutputPath;
+
+  @Before
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    new File(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))
+        .createNewFile();
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    // read records and get 10 to generate duplicates
+    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
+
+    String fileName3 = "3_0_20160401010202.parquet";
+    commitTime = FSUtils.getCommitTime(fileName3);
+    df.limit(10).withColumn("_hoodie_commit_time", lit(commitTime))
+        .write().parquet(duplicatedPartitionPath + File.separator + fileName3);
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+  }
+
+  /**
+   * Test case for dry run deduplicate.
+   */
+  @Test
+  public void testDeduplicate() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(f -> f.getPath()).collect(Collectors.toList());
+    assertEquals("There should be 3 files.", 3, filteredStatuses.size());
+
+    // Before deduplicate, all files contain 210 records
+    String[] files = filteredStatuses.toArray(new String[filteredStatuses.size()]);

Review comment:
       We do not need to specify the size for `toArray()`. `new String[0]` is OK.

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+  String duplicatedPartitionPath;
+  String repairedOutputPath;
+
+  @Before
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    new File(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))
+        .createNewFile();
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    // read records and get 10 to generate duplicates
+    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
+
+    String fileName3 = "3_0_20160401010202.parquet";
+    commitTime = FSUtils.getCommitTime(fileName3);
+    df.limit(10).withColumn("_hoodie_commit_time", lit(commitTime))
+        .write().parquet(duplicatedPartitionPath + File.separator + fileName3);
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+  }
+
+  /**
+   * Test case for dry run deduplicate.
+   */
+  @Test
+  public void testDeduplicate() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(f -> f.getPath()).collect(Collectors.toList());

Review comment:
       `map(f -> f.getPath())` -> `map(HoodieBaseFile::getPath)`

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+  String duplicatedPartitionPath;
+  String repairedOutputPath;
+
+  @Before
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    new File(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))
+        .createNewFile();
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    // read records and get 10 to generate duplicates
+    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
+
+    String fileName3 = "3_0_20160401010202.parquet";
+    commitTime = FSUtils.getCommitTime(fileName3);
+    df.limit(10).withColumn("_hoodie_commit_time", lit(commitTime))
+        .write().parquet(duplicatedPartitionPath + File.separator + fileName3);
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+  }
+
+  /**
+   * Test case for dry run deduplicate.
+   */
+  @Test
+  public void testDeduplicate() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(f -> f.getPath()).collect(Collectors.toList());
+    assertEquals("There should be 3 files.", 3, filteredStatuses.size());
+
+    // Before deduplicate, all files contain 210 records
+    String[] files = filteredStatuses.toArray(new String[filteredStatuses.size()]);
+    Dataset df = sqlContext.read().parquet(files);
+    assertEquals(210, df.count());
+
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String cmdStr = "repair deduplicate --duplicatedPartitionPath " + partitionPath
+        + " --repairedOutputPath " + repairedOutputPath + " --sparkMaster local";
+    CommandResult cr = getShell().executeCommand(cmdStr);
+    assertTrue(cr.isSuccess());
+    assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, cr.getResult().toString());
+
+    // After deduplicate, there are 200 records
+    FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath));
+    files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new);
+    Dataset result = sqlContext.read().parquet(files);
+    assertEquals(200, result.count());
+  }
+
+  /**
+   * Test case for real run deduplicate.
+   */
+  @Test
+  public void testDeduplicateWithReal() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(f -> f.getPath()).collect(Collectors.toList());

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] hddong commented on pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
hddong commented on pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#issuecomment-623485387






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] hddong removed a comment on pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
hddong removed a comment on pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#issuecomment-623485400


   @yanghua : Thanks for your review and comments, had address them.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#discussion_r415282903



##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {

Review comment:
       Any specific reason for having a separate class for RepairsCommand#deduplicate method? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#discussion_r415281069



##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+  String duplicatedPartitionPath;
+  String repairedOutputPath;
+
+  @Before
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    new File(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))
+        .createNewFile();
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    // read records and get 10 to generate duplicates
+    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
+
+    String fileName3 = "3_0_20160401010101.parquet";
+    df.limit(10).withColumn("_hoodie_commit_time", lit("20160401010202"))
+        .write().parquet(duplicatedPartitionPath + File.separator + fileName3);
+
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+  }
+
+  /**
+   * Test case for dry run deduplicate.
+   */
+  @Test
+  public void testDeduplicate() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(f -> f.getPath()).collect(Collectors.toList());
+    assertEquals("There should be 3 files.", 3, filteredStatuses.size());
+
+    // Before deduplicate, all files contain 210 records
+    String[] files = filteredStatuses.toArray(new String[filteredStatuses.size()]);
+    Dataset df = sqlContext.read().parquet(files);
+    assertEquals(210, df.count());
+
+    String partitionPath = "2016/03/15";

Review comment:
       Let us use the constant from HoodieTestDataGenerator rather than using this string when running actual command.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#issuecomment-619529855


   @hddong You might want to have a look at https://github.com/apache/incubator-hudi/pull/1558/files and add a test case for the upserts case as well :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#discussion_r415281880



##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+  String duplicatedPartitionPath;
+  String repairedOutputPath;
+
+  @Before
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    new File(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))
+        .createNewFile();
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    // read records and get 10 to generate duplicates
+    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
+
+    String fileName3 = "3_0_20160401010101.parquet";
+    df.limit(10).withColumn("_hoodie_commit_time", lit("20160401010202"))
+        .write().parquet(duplicatedPartitionPath + File.separator + fileName3);
+
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+  }
+
+  /**
+   * Test case for dry run deduplicate.
+   */
+  @Test
+  public void testDeduplicate() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(f -> f.getPath()).collect(Collectors.toList());
+    assertEquals("There should be 3 files.", 3, filteredStatuses.size());
+
+    // Before deduplicate, all files contain 210 records
+    String[] files = filteredStatuses.toArray(new String[filteredStatuses.size()]);
+    Dataset df = sqlContext.read().parquet(files);
+    assertEquals(210, df.count());
+
+    String partitionPath = "2016/03/15";
+    String cmdStr = "repair deduplicate --duplicatedPartitionPath " + partitionPath
+        + " --repairedOutputPath " + repairedOutputPath + " --sparkMaster local";
+    CommandResult cr = getShell().executeCommand(cmdStr);
+    assertTrue(cr.isSuccess());
+    assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, cr.getResult().toString());
+
+    // After deduplicate, there are 200 records
+    FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath));
+    files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new);
+    Dataset result = sqlContext.read().parquet(files);
+    assertEquals(200, result.count());
+  }
+
+  /**
+   * Test case for real run deduplicate.
+   */
+  @Test
+  public void testDeduplicateWithReal() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(f -> f.getPath()).collect(Collectors.toList());
+    assertEquals("There should be 3 files.", 3, filteredStatuses.size());
+
+    // Before deduplicate, all files contain 210 records
+    String[] files = filteredStatuses.toArray(new String[filteredStatuses.size()]);
+    Dataset df = sqlContext.read().parquet(files);
+    assertEquals(210, df.count());
+
+    String partitionPath = "2016/03/15";

Review comment:
       same here as well. Let us use the constant. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#discussion_r421734414



##########
File path: hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
##########
@@ -64,19 +69,35 @@ public String deduplicate(
       @CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files",
           mandatory = true) final String repairedOutputPath,
       @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path",
-          mandatory = true) final String sparkPropertiesPath)
+          unspecifiedDefaultValue = "") String sparkPropertiesPath,
+      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master,
+      @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
+          help = "Spark executor memory") final String sparkMemory,
+      @CliOption(key = {"dryrun"},
+          help = "Should we actually remove duplicates or just run and store result to repairedOutputPath",
+          unspecifiedDefaultValue = "true") final boolean dryRun)
       throws Exception {
+    if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) {
+      sparkPropertiesPath =
+          Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
+    }
+
     SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
-    sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), duplicatedPartitionPath, repairedOutputPath,
-        HoodieCLI.getTableMetaClient().getBasePath());
+    sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), master, sparkMemory,

Review comment:
       @hddong Yeah its been open for some time now. The work was mostly done, I was stuck at fixing test cases. Will take a look at it soon. :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] hddong commented on a change in pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
hddong commented on a change in pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#discussion_r421387588



##########
File path: hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
##########
@@ -64,19 +69,35 @@ public String deduplicate(
       @CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files",
           mandatory = true) final String repairedOutputPath,
       @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path",
-          mandatory = true) final String sparkPropertiesPath)
+          unspecifiedDefaultValue = "") String sparkPropertiesPath,
+      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master,
+      @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
+          help = "Spark executor memory") final String sparkMemory,
+      @CliOption(key = {"dryrun"},
+          help = "Should we actually remove duplicates or just run and store result to repairedOutputPath",
+          unspecifiedDefaultValue = "true") final boolean dryRun)
       throws Exception {
+    if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) {
+      sparkPropertiesPath =
+          Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
+    }
+
     SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
-    sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), duplicatedPartitionPath, repairedOutputPath,
-        HoodieCLI.getTableMetaClient().getBasePath());
+    sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), master, sparkMemory,

Review comment:
       > The same suggestion, we should try to define a data structure? We can refactor it later.
   
   We can focus on PR(#1174), but it was left behind for too long.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] hddong commented on a change in pull request #1554: [HUDI-704]Add test for RepairsCommand

Posted by GitBox <gi...@apache.org>.
hddong commented on a change in pull request #1554:
URL: https://github.com/apache/incubator-hudi/pull/1554#discussion_r416283181



##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+  String duplicatedPartitionPath;
+  String repairedOutputPath;
+
+  @Before
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    new File(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))
+        .createNewFile();
+    new File(tablePath + "/.hoodie/" + commitTime + ".commit").createNewFile();
+
+    // read records and get 10 to generate duplicates
+    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
+
+    String fileName3 = "3_0_20160401010101.parquet";
+    df.limit(10).withColumn("_hoodie_commit_time", lit("20160401010202"))

Review comment:
       > This seems a bit misleading. If the file has time of 20160401010101, how can the records have time of 20160401010202? Rather we should have the file as 3_0_20160401010202.parquet and generate one more .commit file in meta folder for this.
   > 
   > Please correct me if I am missing something.
   
   Yes, you are right, it maybe cause misleading, had address it, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org