You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/05/07 15:03:45 UTC

[incubator-hudi] branch master updated: [HUDI-704] Add test for RepairsCommand (#1554)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f921469  [HUDI-704] Add test for RepairsCommand (#1554)
f921469 is described below

commit f921469afcd03baaf2a4f3266c8ba97e516d6935
Author: hongdd <jn...@163.com>
AuthorDate: Thu May 7 23:02:28 2020 +0800

    [HUDI-704] Add test for RepairsCommand (#1554)
---
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |   9 +
 .../apache/hudi/cli/commands/RepairsCommand.java   |  38 +++-
 .../org/apache/hudi/cli/commands/SparkMain.java    |  11 +-
 .../scala/org/apache/hudi/cli/SparkHelpers.scala   |   4 +
 .../hudi/cli/commands/TestRepairsCommand.java      | 207 +++++++++++++++++++++
 .../hudi/cli/integ/ITTestRepairsCommand.java       | 179 ++++++++++++++++++
 .../src/test/resources/table-config.properties     |  21 +++
 7 files changed, 457 insertions(+), 12 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index 3b398e3..5e31e5c 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -72,4 +72,13 @@ public class HoodieTableHeaderFields {
   public static final String HEADER_DELTA_BASE_UNSCHEDULED = "Delta To Base Ratio" + COMPACTION_UNSCHEDULED_SUFFIX;
   public static final String HEADER_DELTA_FILES_SCHEDULED = "Delta Files" + COMPACTION_SCHEDULED_SUFFIX;
   public static final String HEADER_DELTA_FILES_UNSCHEDULED = "Delta Files" + COMPACTION_UNSCHEDULED_SUFFIX;
+
+  /**
+   * Fields of Repair.
+   */
+  public static final String HEADER_METADATA_PRESENT = "Metadata Present?";
+  public static final String HEADER_REPAIR_ACTION = "Action";
+  public static final String HEADER_HOODIE_PROPERTY = "Property";
+  public static final String HEADER_OLD_VALUE = "Old Value";
+  public static final String HEADER_NEW_VALUE = "New Value";
 }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index 41de7d2..0af9ff2 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -20,6 +20,7 @@ package org.apache.hudi.cli.commands;
 
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
 import org.apache.hudi.cli.utils.InputStreamConsumer;
 import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.common.fs.FSUtils;
@@ -30,12 +31,15 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.CleanerUtils;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.util.Utils;
 import org.springframework.shell.core.CommandMarker;
 import org.springframework.shell.core.annotation.CliCommand;
 import org.springframework.shell.core.annotation.CliOption;
 import org.springframework.stereotype.Component;
+import scala.collection.JavaConverters;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -55,6 +59,7 @@ import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME
 public class RepairsCommand implements CommandMarker {
 
   private static final Logger LOG = Logger.getLogger(RepairsCommand.class);
+  public static final String DEDUPLICATE_RETURN_PREFIX = "Deduplicated files placed in:  ";
 
   @CliCommand(value = "repair deduplicate",
       help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with")
@@ -64,19 +69,35 @@ public class RepairsCommand implements CommandMarker {
       @CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files",
           mandatory = true) final String repairedOutputPath,
       @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path",
-          mandatory = true) final String sparkPropertiesPath)
+          unspecifiedDefaultValue = "") String sparkPropertiesPath,
+      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
+      @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
+          help = "Spark executor memory") final String sparkMemory,
+      @CliOption(key = {"dryrun"},
+          help = "Should we actually remove duplicates or just run and store result to repairedOutputPath",
+          unspecifiedDefaultValue = "true") final boolean dryRun)
       throws Exception {
+    if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) {
+      sparkPropertiesPath =
+          Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
+    }
+
     SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
-    sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), duplicatedPartitionPath, repairedOutputPath,
-        HoodieCLI.getTableMetaClient().getBasePath());
+    sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), master, sparkMemory,
+        duplicatedPartitionPath, repairedOutputPath, HoodieCLI.getTableMetaClient().getBasePath(),
+        String.valueOf(dryRun));
     Process process = sparkLauncher.launch();
     InputStreamConsumer.captureOutput(process);
     int exitCode = process.waitFor();
 
     if (exitCode != 0) {
-      return "Deduplicated files placed in:  " + repairedOutputPath;
+      return "Deduplication failed!";
+    }
+    if (dryRun) {
+      return DEDUPLICATE_RETURN_PREFIX + repairedOutputPath;
+    } else {
+      return DEDUPLICATE_RETURN_PREFIX + duplicatedPartitionPath;
     }
-    return "Deduplication failed ";
   }
 
   @CliCommand(value = "repair addpartitionmeta", help = "Add partition metadata to a table, if not present")
@@ -106,12 +127,14 @@ public class RepairsCommand implements CommandMarker {
           HoodiePartitionMetadata partitionMetadata =
               new HoodiePartitionMetadata(HoodieCLI.fs, latestCommit, basePath, partitionPath);
           partitionMetadata.trySave(0);
+          row[2] = "Repaired";
         }
       }
       rows[ind++] = row;
     }
 
-    return HoodiePrintHelper.print(new String[] {"Partition Path", "Metadata Present?", "Action"}, rows);
+    return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
   }
 
   @CliCommand(value = "repair overwrite-hoodie-props", help = "Overwrite hoodie.properties with provided file. Risky operation. Proceed with caution!")
@@ -140,7 +163,8 @@ public class RepairsCommand implements CommandMarker {
       };
       rows[ind++] = row;
     }
-    return HoodiePrintHelper.print(new String[] {"Property", "Old Value", "New Value"}, rows);
+    return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
+        HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
   }
 
   @CliCommand(value = "repair corrupted clean files", help = "repair corrupted clean files")
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 5aa2255..5d8972d 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -73,8 +73,8 @@ public class SparkMain {
         returnCode = rollback(jsc, args[1], args[2]);
         break;
       case DEDUPLICATE:
-        assert (args.length == 4);
-        returnCode = deduplicatePartitionPath(jsc, args[1], args[2], args[3]);
+        assert (args.length == 7);
+        returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], args[6]);
         break;
       case ROLLBACK_TO_SAVEPOINT:
         assert (args.length == 3);
@@ -162,7 +162,8 @@ public class SparkMain {
 
   private static boolean sparkMasterContained(SparkCommand command) {
     List<SparkCommand> masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
-        SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN);
+        SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
+        SparkCommand.DEDUPLICATE);
     return masterContained.contains(command);
   }
 
@@ -263,10 +264,10 @@ public class SparkMain {
   }
 
   private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath,
-      String repairedOutputPath, String basePath) {
+      String repairedOutputPath, String basePath, String dryRun) {
     DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc),
         FSUtils.getFs(basePath, jsc.hadoopConfiguration()));
-    job.fixDuplicates(true);
+    job.fixDuplicates(Boolean.parseBoolean(dryRun));
     return 0;
   }
 
diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
index a229a15..7de5f42 100644
--- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
+++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
@@ -46,6 +46,10 @@ object SparkHelpers {
       HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE);
     val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
     val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)
+
+    // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
+    parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
+
     val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier())
     for (rec <- sourceRecords) {
       val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
new file mode 100644
index 0000000..9fd44b4
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link RepairsCommand}.
+ */
+public class TestRepairsCommand extends AbstractShellIntegrationTest {
+
+  private String tablePath;
+
+  @BeforeEach
+  public void init() throws IOException {
+    String tableName = "test_table";
+    tablePath = basePath + File.separator + tableName;
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+  }
+
+  /**
+   * Test case for dry run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithDryRun() throws IOException {
+    // create commit instant
+    Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
+
+    // create partition path
+    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+    assertTrue(fs.mkdirs(new Path(partition1)));
+    assertTrue(fs.mkdirs(new Path(partition2)));
+    assertTrue(fs.mkdirs(new Path(partition3)));
+
+    // default is dry run.
+    CommandResult cr = getShell().executeCommand("repair addpartitionmeta");
+    assertTrue(cr.isSuccess());
+
+    // expected all 'No'.
+    String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath)
+        .stream()
+        .map(partition -> new String[]{partition, "No", "None"})
+        .toArray(String[][]::new);
+    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+
+    assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for real run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithRealRun() throws IOException {
+    // create commit instant
+    Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
+
+    // create partition path
+    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+    assertTrue(fs.mkdirs(new Path(partition1)));
+    assertTrue(fs.mkdirs(new Path(partition2)));
+    assertTrue(fs.mkdirs(new Path(partition3)));
+
+    CommandResult cr = getShell().executeCommand("repair addpartitionmeta --dryrun false");
+    assertTrue(cr.isSuccess());
+
+    List<String> paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath);
+    // after dry run, the action will be 'Repaired'
+    String[][] rows = paths.stream()
+        .map(partition -> new String[]{partition, "No", "Repaired"})
+        .toArray(String[][]::new);
+    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+
+    assertEquals(expected, cr.getResult().toString());
+
+    cr = getShell().executeCommand("repair addpartitionmeta");
+
+    // after real run, Metadata is present now.
+    rows = paths.stream()
+        .map(partition -> new String[]{partition, "Yes", "None"})
+        .toArray(String[][]::new);
+    expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+    assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for 'repair overwrite-hoodie-props'.
+   */
+  @Test
+  public void testOverwriteHoodieProperties() throws IOException {
+    URL newProps = this.getClass().getClassLoader().getResource("table-config.properties");
+    assertNotNull(newProps, "New property file must exist");
+
+    CommandResult cr = getShell().executeCommand("repair overwrite-hoodie-props --new-props-file " + newProps.getPath());
+    assertTrue(cr.isSuccess());
+
+    Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().getProps();
+
+    // after overwrite, the stored value in .hoodie is equals to which read from properties.
+    Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
+    Properties expectProps = new Properties();
+    expectProps.load(new FileInputStream(new File(newProps.getPath())));
+
+    Map<String, String> expected = expectProps.entrySet().stream()
+        .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
+    assertEquals(expected, result);
+
+    // check result
+    List<String> allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type",
+        "hoodie.archivelog.folder", "hoodie.timeline.layout.version");
+    String[][] rows = allPropsStr.stream().sorted().map(key -> new String[]{key,
+        oldProps.getOrDefault(key, null), result.getOrDefault(key, null)})
+        .toArray(String[][]::new);
+    String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
+        HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
+
+    assertEquals(expect, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for 'repair corrupted clean files'.
+   */
+  @Test
+  public void testRemoveCorruptedPendingCleanAction() throws IOException {
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    Configuration conf = HoodieCLI.conf;
+
+    metaClient = HoodieCLI.getTableMetaClient();
+
+    // Create four requested files
+    for (int i = 100; i < 104; i++) {
+      String timestamp = String.valueOf(i);
+      // Write corrupted requested Compaction
+      HoodieTestCommitMetadataGenerator.createCompactionRequestedFile(tablePath, timestamp, conf);
+    }
+
+    // reload meta client
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    // first, there are four instants
+    assertEquals(4, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
+
+    CommandResult cr = getShell().executeCommand("repair corrupted clean files");
+    assertTrue(cr.isSuccess());
+
+    // reload meta client
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
+  }
+}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
new file mode 100644
index 0000000..4f48bc3
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.RepairsCommand;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.spark.sql.Dataset;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.spark.sql.functions.lit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for {@link RepairsCommand#deduplicate}.
+ * <p/>
+ * A command use SparkLauncher need load jars under lib which generate during mvn package.
+ * Use integration test instead of unit test.
+ */
+public class ITTestRepairsCommand extends AbstractShellIntegrationTest {
+
+  private String duplicatedPartitionPath;
+  private String repairedOutputPath;
+
+  @BeforeEach
+  public void init() throws IOException, URISyntaxException {
+    String tablePath = basePath + File.separator + "test_table";
+    duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    repairedOutputPath = basePath + File.separator + "tmp";
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    // generate 200 records
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+
+    String fileName1 = "1_0_20160401010101.parquet";
+    String fileName2 = "2_0_20160401010101.parquet";
+
+    List<HoodieRecord> hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName1, hoodieRecords1, schema, null, false);
+    List<HoodieRecord> hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema);
+    HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+        fileName2, hoodieRecords2, schema, null, false);
+
+    // generate commit file
+    String fileId1 = UUID.randomUUID().toString();
+    String testWriteToken = "1-0-1";
+    String commitTime = FSUtils.getCommitTime(fileName1);
+    Files.createFile(Paths.get(duplicatedPartitionPath + "/"
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken)));
+    Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + ".commit"));
+
+    // read records and get 10 to generate duplicates
+    Dataset df = sqlContext.read().parquet(duplicatedPartitionPath);
+
+    String fileName3 = "3_0_20160401010202.parquet";
+    commitTime = FSUtils.getCommitTime(fileName3);
+    df.limit(10).withColumn("_hoodie_commit_time", lit(commitTime))
+        .write().parquet(duplicatedPartitionPath + File.separator + fileName3);
+    Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + ".commit"));
+
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+  }
+
+  /**
+   * Test case for dry run deduplicate.
+   */
+  @Test
+  public void testDeduplicate() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+    assertEquals(3, filteredStatuses.size(), "There should be 3 files.");
+
+    // Before deduplicate, all files contain 210 records
+    String[] files = filteredStatuses.toArray(new String[0]);
+    Dataset df = sqlContext.read().parquet(files);
+    assertEquals(210, df.count());
+
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String cmdStr = String.format("repair deduplicate --duplicatedPartitionPath %s --repairedOutputPath %s --sparkMaster %s",
+        partitionPath, repairedOutputPath, "local");
+    CommandResult cr = getShell().executeCommand(cmdStr);
+    assertTrue(cr.isSuccess());
+    assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, cr.getResult().toString());
+
+    // After deduplicate, there are 200 records
+    FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath));
+    files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new);
+    Dataset result = sqlContext.read().parquet(files);
+    assertEquals(200, result.count());
+  }
+
+  /**
+   * Test case for real run deduplicate.
+   */
+  @Test
+  public void testDeduplicateWithReal() throws IOException {
+    // get fs and check number of latest files
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        fs.listStatus(new Path(duplicatedPartitionPath)));
+    List<String> filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+    assertEquals(3, filteredStatuses.size(), "There should be 3 files.");
+
+    // Before deduplicate, all files contain 210 records
+    String[] files = filteredStatuses.toArray(new String[0]);
+    Dataset df = sqlContext.read().parquet(files);
+    assertEquals(210, df.count());
+
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String cmdStr = String.format("repair deduplicate --duplicatedPartitionPath %s --repairedOutputPath %s"
+        + " --sparkMaster %s --dryrun %s", partitionPath, repairedOutputPath, "local", false);
+    CommandResult cr = getShell().executeCommand(cmdStr);
+    assertTrue(cr.isSuccess());
+    assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + partitionPath, cr.getResult().toString());
+
+    // After deduplicate, there are 200 records under partition path
+    FileStatus[] fileStatus = fs.listStatus(new Path(duplicatedPartitionPath));
+    files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new);
+    Dataset result = sqlContext.read().parquet(files);
+    assertEquals(200, result.count());
+  }
+}
diff --git a/hudi-cli/src/test/resources/table-config.properties b/hudi-cli/src/test/resources/table-config.properties
new file mode 100644
index 0000000..d74c044
--- /dev/null
+++ b/hudi-cli/src/test/resources/table-config.properties
@@ -0,0 +1,21 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+hoodie.table.name=test_table
+hoodie.table.type=COPY_ON_WRITE
+hoodie.archivelog.folder=archive
+hoodie.timeline.layout.version=1