You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/05/28 00:19:13 UTC

[hudi] 30/40: [HUDI-820] cleaner repair command should only inspect clean metadata files (#1542)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4095de4d45f174b45dd1310a6a99c4fecfb2466a
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Sun May 10 18:25:54 2020 -0700

    [HUDI-820] cleaner repair command should only inspect clean metadata files (#1542)
---
 hudi-cli/pom.xml                                   |  21 +++
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |  36 ++++
 .../apache/hudi/cli/commands/RepairsCommand.java   |  21 ++-
 .../cli/commands/AbstractShellIntegrationTest.java |  61 ++++++
 .../HoodieTestCommitMetadataGenerator.java         | 137 ++++++++++++++
 .../hudi/cli/commands/TestRepairsCommand.java      | 206 +++++++++++++++++++++
 .../hudi/common/HoodieTestDataGenerator.java       |  17 +-
 .../apache/hudi/common/util/CollectionUtils.java   | 111 +++++++++++
 8 files changed, 601 insertions(+), 9 deletions(-)

diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index a5e358c..7b63e23 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -150,6 +150,27 @@
       <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-client</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
 
     <!-- Logging -->
     <dependency>
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
new file mode 100644
index 0000000..2e3bc01
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli;
+
+/**
+ * Fields of print table header.
+ */
+public class HoodieTableHeaderFields {
+
+  public static final String HEADER_PARTITION = "Partition";
+  public static final String HEADER_PARTITION_PATH = HEADER_PARTITION + " Path";
+  /**
+   * Fields of Repair.
+   */
+  public static final String HEADER_METADATA_PRESENT = "Metadata Present?";
+  public static final String HEADER_REPAIR_ACTION = "Action";
+  public static final String HEADER_HOODIE_PROPERTY = "Property";
+  public static final String HEADER_OLD_VALUE = "Old Value";
+  public static final String HEADER_NEW_VALUE = "New Value";
+}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index 7a65336..7feebed 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -25,10 +25,12 @@ import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.exception.HoodieIOException;
 
+import org.apache.avro.AvroRuntimeException;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 import org.apache.spark.launcher.SparkLauncher;
@@ -147,14 +149,21 @@ public class RepairsCommand implements CommandMarker {
   public void removeCorruptedPendingCleanAction() {
 
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
-    HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
-
-    activeTimeline.filterInflightsAndRequested().getInstants().forEach(instant -> {
+    HoodieTimeline cleanerTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline().getCleanerTimeline();
+    LOG.info("Inspecting pending clean metadata in timeline for corrupted files");
+    cleanerTimeline.filterInflightsAndRequested().getInstants().forEach(instant -> {
       try {
         CleanerUtils.getCleanerPlan(client, instant);
-      } catch (IOException e) {
-        LOG.warn("try to remove corrupted instant file: " + instant);
+      } catch (AvroRuntimeException e) {
+        LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
         FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant);
+      } catch (IOException ioe) {
+        if (ioe.getMessage().contains("Not an Avro data file")) {
+          LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
+          FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant);
+        } else {
+          throw new HoodieIOException(ioe.getMessage(), ioe);
+        }
       }
     });
   }
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
new file mode 100644
index 0000000..ad81af5
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.common.HoodieClientTestHarness;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.springframework.shell.Bootstrap;
+import org.springframework.shell.core.JLineShellComponent;
+
+/**
+ * Class to start Bootstrap and JLineShellComponent.
+ */
+public abstract class AbstractShellIntegrationTest extends HoodieClientTestHarness {
+
+  private static JLineShellComponent shell;
+
+  @BeforeClass
+  public static void startup() {
+    Bootstrap bootstrap = new Bootstrap();
+    shell = bootstrap.getJLineShellComponent();
+  }
+
+  @AfterClass
+  public static void shutdown() {
+    shell.stop();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    initResources();
+  }
+
+  @After
+  public void teardown() throws Exception {
+    cleanupResources();
+  }
+
+  protected static JLineShellComponent getShell() {
+    return shell;
+  }
+}
\ No newline at end of file
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/HoodieTestCommitMetadataGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/HoodieTestCommitMetadataGenerator.java
new file mode 100644
index 0000000..7abad66
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/HoodieTestCommitMetadataGenerator.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTestUtils;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class to be used in tests to keep generating test inserts and updates against a corpus.
+ */
+public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
+
+  // default commit metadata value
+  public static final String DEFAULT_PATH = "path";
+  public static final String DEFAULT_FILEID = "fileId";
+  public static final int DEFAULT_TOTAL_WRITE_BYTES = 50;
+  public static final String DEFAULT_PRE_COMMIT = "commit-1";
+  public static final int DEFAULT_NUM_WRITES = 10;
+  public static final int DEFAULT_NUM_UPDATE_WRITES = 15;
+  public static final int DEFAULT_TOTAL_LOG_BLOCKS = 1;
+  public static final int DEFAULT_TOTAL_LOG_RECORDS = 10;
+  public static final int DEFAULT_OTHER_VALUE = 0;
+  public static final String DEFAULT_NULL_VALUE = "null";
+
+  /**
+   * Create a commit file with default CommitMetadata.
+   */
+  public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration) {
+    createCommitFileWithMetadata(basePath, commitTime, configuration, Option.empty(), Option.empty());
+  }
+
+  public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
+                                                  Option<Integer> writes, Option<Integer> updates) {
+    Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
+        HoodieTimeline.makeRequestedCommitFileName(commitTime))
+        .forEach(f -> {
+          Path commitFile = new Path(
+              basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
+          FSDataOutputStream os = null;
+          try {
+            FileSystem fs = FSUtils.getFs(basePath, configuration);
+            os = fs.create(commitFile, true);
+            // Generate commitMetadata
+            HoodieCommitMetadata commitMetadata = generateCommitMetadata(basePath, commitTime, writes, updates);
+            // Write empty commit metadata
+            os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+          } catch (IOException ioe) {
+            throw new HoodieIOException(ioe.getMessage(), ioe);
+          } finally {
+            if (null != os) {
+              try {
+                os.close();
+              } catch (IOException e) {
+                throw new HoodieIOException(e.getMessage(), e);
+              }
+            }
+          }
+        });
+  }
+
+  /**
+   * Generate commitMetadata in path.
+   */
+  public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime) throws IOException {
+    return generateCommitMetadata(basePath, commitTime, Option.empty(), Option.empty());
+  }
+
+  public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime,
+                                                            Option<Integer> writes, Option<Integer> updates) throws IOException {
+    String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime);
+    String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime);
+    return generateCommitMetadata(new HashMap<String, List<String>>() {
+      {
+        put(DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0));
+        put(DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0));
+      }
+    }, writes, updates);
+  }
+
+  /**
+   * Method to generate commit metadata.
+   */
+  private static HoodieCommitMetadata generateCommitMetadata(Map<String, List<String>> partitionToFilePaths,
+                                                             Option<Integer> writes, Option<Integer> updates) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    partitionToFilePaths.forEach((key, value) -> value.forEach(f -> {
+      HoodieWriteStat writeStat = new HoodieWriteStat();
+      writeStat.setPartitionPath(key);
+      writeStat.setPath(DEFAULT_PATH);
+      writeStat.setFileId(DEFAULT_FILEID);
+      writeStat.setTotalWriteBytes(DEFAULT_TOTAL_WRITE_BYTES);
+      writeStat.setPrevCommit(DEFAULT_PRE_COMMIT);
+      writeStat.setNumWrites(writes.orElse(DEFAULT_NUM_WRITES));
+      writeStat.setNumUpdateWrites(updates.orElse(DEFAULT_NUM_UPDATE_WRITES));
+      writeStat.setTotalLogBlocks(DEFAULT_TOTAL_LOG_BLOCKS);
+      writeStat.setTotalLogRecords(DEFAULT_TOTAL_LOG_RECORDS);
+      metadata.addWriteStat(key, writeStat);
+    }));
+    return metadata;
+  }
+}
\ No newline at end of file
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
new file mode 100644
index 0000000..1192915
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.TimelineLayoutVersion;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.FSUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test class for {@link RepairsCommand}.
+ */
+public class TestRepairsCommand extends AbstractShellIntegrationTest {
+
+  private String tablePath;
+
+  @Before
+  public void init() throws IOException {
+    String tableName = "test_table";
+    tablePath = basePath + File.separator + tableName;
+
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+  }
+
+  /**
+   * Test case for dry run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithDryRun() throws IOException {
+    // create commit instant
+    Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
+
+    // create partition path
+    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+    assertTrue(fs.mkdirs(new Path(partition1)));
+    assertTrue(fs.mkdirs(new Path(partition2)));
+    assertTrue(fs.mkdirs(new Path(partition3)));
+
+    // default is dry run.
+    CommandResult cr = getShell().executeCommand("repair addpartitionmeta");
+    assertTrue(cr.isSuccess());
+
+    // expected all 'No'.
+    String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath)
+        .stream()
+        .map(partition -> new String[] {partition, "No", "None"})
+        .toArray(String[][]::new);
+    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+
+    assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for real run 'repair addpartitionmeta'.
+   */
+  @Test
+  public void testAddPartitionMetaWithRealRun() throws IOException {
+    // create commit instant
+    Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
+
+    // create partition path
+    String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+    String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+    assertTrue(fs.mkdirs(new Path(partition1)));
+    assertTrue(fs.mkdirs(new Path(partition2)));
+    assertTrue(fs.mkdirs(new Path(partition3)));
+
+    CommandResult cr = getShell().executeCommand("repair addpartitionmeta --dryrun false");
+    assertTrue(cr.isSuccess());
+
+    List<String> paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath);
+    // after dry run, the action will be 'Repaired'
+    String[][] rows = paths.stream()
+        .map(partition -> new String[] {partition, "No", "Repaired"})
+        .toArray(String[][]::new);
+    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+
+    assertEquals(expected, cr.getResult().toString());
+
+    cr = getShell().executeCommand("repair addpartitionmeta");
+
+    // after real run, Metadata is present now.
+    rows = paths.stream()
+        .map(partition -> new String[] {partition, "Yes", "None"})
+        .toArray(String[][]::new);
+    expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+        HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+    assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for 'repair overwrite-hoodie-props'.
+   */
+  @Test
+  public void testOverwriteHoodieProperties() throws IOException {
+    URL newProps = this.getClass().getClassLoader().getResource("table-config.properties");
+    assertNotNull("New property file must exist", newProps);
+
+    CommandResult cr = getShell().executeCommand("repair overwrite-hoodie-props --new-props-file " + newProps.getPath());
+    assertTrue(cr.isSuccess());
+
+    Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().getProps();
+
+    // after overwrite, the stored value in .hoodie is equals to which read from properties.
+    Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
+    Properties expectProps = new Properties();
+    expectProps.load(new FileInputStream(new File(newProps.getPath())));
+
+    Map<String, String> expected = expectProps.entrySet().stream()
+        .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
+    assertEquals(expected, result);
+
+    // check result
+    List<String> allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type",
+        "hoodie.archivelog.folder", "hoodie.timeline.layout.version");
+    String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] {key,
+        oldProps.getOrDefault(key, null), result.getOrDefault(key, null)})
+        .toArray(String[][]::new);
+    String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
+        HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
+
+    assertEquals(expect, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for 'repair corrupted clean files'.
+   */
+  @Test
+  public void testRemoveCorruptedPendingCleanAction() throws IOException {
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    Configuration conf = HoodieCLI.conf;
+
+    metaClient = HoodieCLI.getTableMetaClient();
+
+    // Create four requested files
+    for (int i = 100; i < 104; i++) {
+      String timestamp = String.valueOf(i);
+      // Write corrupted requested Compaction
+      HoodieTestCommitMetadataGenerator.createCompactionRequestedFile(tablePath, timestamp, conf);
+    }
+
+    // reload meta client
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    // first, there are four instants
+    assertEquals(4, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
+
+    CommandResult cr = getShell().executeCommand("repair corrupted clean files");
+    assertTrue(cr.isSuccess());
+
+    // reload meta client
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
+  }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index 99bea63..0ec3bc8 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -216,12 +216,23 @@ public class HoodieTestDataGenerator {
         });
   }
 
-  public static void createCompactionRequestedFile(String basePath, String commitTime, Configuration configuration)
+  public static void createEmptyCleanRequestedFile(String basePath, String instantTime, Configuration configuration)
       throws IOException {
     Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
-        + HoodieTimeline.makeRequestedCompactionFileName(commitTime));
+        + HoodieTimeline.makeRequestedCleanerFileName(instantTime));
+    createEmptyFile(basePath, commitFile, configuration);
+  }
+
+  public static void createCompactionRequestedFile(String basePath, String instantTime, Configuration configuration)
+      throws IOException {
+    Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+        + HoodieTimeline.makeRequestedCompactionFileName(instantTime));
+    createEmptyFile(basePath, commitFile, configuration);
+  }
+
+  private static void createEmptyFile(String basePath, Path filePath, Configuration configuration) throws IOException {
     FileSystem fs = FSUtils.getFs(basePath, configuration);
-    FSDataOutputStream os = fs.create(commitFile, true);
+    FSDataOutputStream os = fs.create(filePath, true);
     os.close();
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
new file mode 100644
index 0000000..15e8bea
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class CollectionUtils {
+  /**
+   * Determines whether two iterators contain equal elements in the same order. More specifically,
+   * this method returns {@code true} if {@code iterator1} and {@code iterator2} contain the same
+   * number of elements and every element of {@code iterator1} is equal to the corresponding element
+   * of {@code iterator2}.
+   *
+   * <p>Note that this will modify the supplied iterators, since they will have been advanced some
+   * number of elements forward.
+   */
+  public static boolean elementsEqual(Iterator<?> iterator1, Iterator<?> iterator2) {
+    while (iterator1.hasNext()) {
+      if (!iterator2.hasNext()) {
+        return false;
+      }
+      Object o1 = iterator1.next();
+      Object o2 = iterator2.next();
+      if (!Objects.equals(o1, o2)) {
+        return false;
+      }
+    }
+    return !iterator2.hasNext();
+  }
+
+  @SafeVarargs
+  public static <T> Set<T> createSet(final T... elements) {
+    return Stream.of(elements).collect(Collectors.toSet());
+  }
+
+  public static <K,V> Map<K, V> createImmutableMap(final K key, final V value) {
+    return Collections.unmodifiableMap(Collections.singletonMap(key, value));
+  }
+
+  @SafeVarargs
+  public static <T> List<T> createImmutableList(final T... elements) {
+    return Collections.unmodifiableList(Stream.of(elements).collect(Collectors.toList()));
+  }
+
+  public static <K,V> Map<K,V> createImmutableMap(final Map<K,V> map) {
+    return Collections.unmodifiableMap(map);
+  }
+
+  @SafeVarargs
+  public static <K,V> Map<K,V> createImmutableMap(final Pair<K,V>... elements) {
+    Map<K,V> map = new HashMap<>();
+    for (Pair<K,V> pair: elements) {
+      map.put(pair.getLeft(), pair.getRight());
+    }
+    return Collections.unmodifiableMap(map);
+  }
+
+  @SafeVarargs
+  public static <T> Set<T> createImmutableSet(final T... elements) {
+    return Collections.unmodifiableSet(createSet(elements));
+  }
+
+  public static <T> Set<T> createImmutableSet(final Set<T> set) {
+    return Collections.unmodifiableSet(set);
+  }
+
+  public static <T> List<T> createImmutableList(final List<T> list) {
+    return Collections.unmodifiableList(list);
+  }
+
+  private static Object[] checkElementsNotNull(Object... array) {
+    return checkElementsNotNull(array, array.length);
+  }
+
+  private static Object[] checkElementsNotNull(Object[] array, int length) {
+    for (int i = 0; i < length; i++) {
+      checkElementNotNull(array[i], i);
+    }
+    return array;
+  }
+
+  private static Object checkElementNotNull(Object element, int index) {
+    return Objects.requireNonNull(element, "Element is null at index " + index);
+  }
+}
\ No newline at end of file