You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/05/28 00:19:13 UTC
[hudi] 30/40: [HUDI-820] cleaner repair command should only inspect
clean metadata files (#1542)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4095de4d45f174b45dd1310a6a99c4fecfb2466a
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Sun May 10 18:25:54 2020 -0700
[HUDI-820] cleaner repair command should only inspect clean metadata files (#1542)
---
hudi-cli/pom.xml | 21 +++
.../apache/hudi/cli/HoodieTableHeaderFields.java | 36 ++++
.../apache/hudi/cli/commands/RepairsCommand.java | 21 ++-
.../cli/commands/AbstractShellIntegrationTest.java | 61 ++++++
.../HoodieTestCommitMetadataGenerator.java | 137 ++++++++++++++
.../hudi/cli/commands/TestRepairsCommand.java | 206 +++++++++++++++++++++
.../hudi/common/HoodieTestDataGenerator.java | 17 +-
.../apache/hudi/common/util/CollectionUtils.java | 111 +++++++++++
8 files changed, 601 insertions(+), 9 deletions(-)
diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml
index a5e358c..7b63e23 100644
--- a/hudi-cli/pom.xml
+++ b/hudi-cli/pom.xml
@@ -150,6 +150,27 @@
<artifactId>hudi-utilities_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-client</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
<!-- Logging -->
<dependency>
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
new file mode 100644
index 0000000..2e3bc01
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli;
+
+/**
+ * Fields of print table header.
+ */
+public class HoodieTableHeaderFields {
+
+ public static final String HEADER_PARTITION = "Partition";
+ public static final String HEADER_PARTITION_PATH = HEADER_PARTITION + " Path";
+ /**
+ * Fields of Repair.
+ */
+ public static final String HEADER_METADATA_PRESENT = "Metadata Present?";
+ public static final String HEADER_REPAIR_ACTION = "Action";
+ public static final String HEADER_HOODIE_PROPERTY = "Property";
+ public static final String HEADER_OLD_VALUE = "Old Value";
+ public static final String HEADER_NEW_VALUE = "New Value";
+}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index 7a65336..7feebed 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -25,10 +25,12 @@ import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.avro.AvroRuntimeException;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
@@ -147,14 +149,21 @@ public class RepairsCommand implements CommandMarker {
public void removeCorruptedPendingCleanAction() {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
- HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
-
- activeTimeline.filterInflightsAndRequested().getInstants().forEach(instant -> {
+ HoodieTimeline cleanerTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline().getCleanerTimeline();
+ LOG.info("Inspecting pending clean metadata in timeline for corrupted files");
+ cleanerTimeline.filterInflightsAndRequested().getInstants().forEach(instant -> {
try {
CleanerUtils.getCleanerPlan(client, instant);
- } catch (IOException e) {
- LOG.warn("try to remove corrupted instant file: " + instant);
+ } catch (AvroRuntimeException e) {
+ LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant);
+ } catch (IOException ioe) {
+ if (ioe.getMessage().contains("Not an Avro data file")) {
+ LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
+ FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant);
+ } else {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
}
});
}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
new file mode 100644
index 0000000..ad81af5
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.common.HoodieClientTestHarness;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.springframework.shell.Bootstrap;
+import org.springframework.shell.core.JLineShellComponent;
+
+/**
+ * Class to start Bootstrap and JLineShellComponent.
+ */
+public abstract class AbstractShellIntegrationTest extends HoodieClientTestHarness {
+
+ private static JLineShellComponent shell;
+
+ @BeforeClass
+ public static void startup() {
+ Bootstrap bootstrap = new Bootstrap();
+ shell = bootstrap.getJLineShellComponent();
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ shell.stop();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ initResources();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ cleanupResources();
+ }
+
+ protected static JLineShellComponent getShell() {
+ return shell;
+ }
+}
\ No newline at end of file
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/HoodieTestCommitMetadataGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/HoodieTestCommitMetadataGenerator.java
new file mode 100644
index 0000000..7abad66
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/HoodieTestCommitMetadataGenerator.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTestUtils;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class to be used in tests to keep generating test inserts and updates against a corpus.
+ */
+public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
+
+ // default commit metadata value
+ public static final String DEFAULT_PATH = "path";
+ public static final String DEFAULT_FILEID = "fileId";
+ public static final int DEFAULT_TOTAL_WRITE_BYTES = 50;
+ public static final String DEFAULT_PRE_COMMIT = "commit-1";
+ public static final int DEFAULT_NUM_WRITES = 10;
+ public static final int DEFAULT_NUM_UPDATE_WRITES = 15;
+ public static final int DEFAULT_TOTAL_LOG_BLOCKS = 1;
+ public static final int DEFAULT_TOTAL_LOG_RECORDS = 10;
+ public static final int DEFAULT_OTHER_VALUE = 0;
+ public static final String DEFAULT_NULL_VALUE = "null";
+
+ /**
+ * Create a commit file with default CommitMetadata.
+ */
+ public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration) {
+ createCommitFileWithMetadata(basePath, commitTime, configuration, Option.empty(), Option.empty());
+ }
+
+ public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
+ Option<Integer> writes, Option<Integer> updates) {
+ Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
+ HoodieTimeline.makeRequestedCommitFileName(commitTime))
+ .forEach(f -> {
+ Path commitFile = new Path(
+ basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
+ FSDataOutputStream os = null;
+ try {
+ FileSystem fs = FSUtils.getFs(basePath, configuration);
+ os = fs.create(commitFile, true);
+ // Generate commitMetadata
+ HoodieCommitMetadata commitMetadata = generateCommitMetadata(basePath, commitTime, writes, updates);
+ // Write empty commit metadata
+ os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ } finally {
+ if (null != os) {
+ try {
+ os.close();
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Generate commitMetadata in path.
+ */
+ public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime) throws IOException {
+ return generateCommitMetadata(basePath, commitTime, Option.empty(), Option.empty());
+ }
+
+ public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime,
+ Option<Integer> writes, Option<Integer> updates) throws IOException {
+ String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime);
+ String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime);
+ return generateCommitMetadata(new HashMap<String, List<String>>() {
+ {
+ put(DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0));
+ put(DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0));
+ }
+ }, writes, updates);
+ }
+
+ /**
+ * Method to generate commit metadata.
+ */
+ private static HoodieCommitMetadata generateCommitMetadata(Map<String, List<String>> partitionToFilePaths,
+ Option<Integer> writes, Option<Integer> updates) {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ partitionToFilePaths.forEach((key, value) -> value.forEach(f -> {
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setPartitionPath(key);
+ writeStat.setPath(DEFAULT_PATH);
+ writeStat.setFileId(DEFAULT_FILEID);
+ writeStat.setTotalWriteBytes(DEFAULT_TOTAL_WRITE_BYTES);
+ writeStat.setPrevCommit(DEFAULT_PRE_COMMIT);
+ writeStat.setNumWrites(writes.orElse(DEFAULT_NUM_WRITES));
+ writeStat.setNumUpdateWrites(updates.orElse(DEFAULT_NUM_UPDATE_WRITES));
+ writeStat.setTotalLogBlocks(DEFAULT_TOTAL_LOG_BLOCKS);
+ writeStat.setTotalLogRecords(DEFAULT_TOTAL_LOG_RECORDS);
+ metadata.addWriteStat(key, writeStat);
+ }));
+ return metadata;
+ }
+}
\ No newline at end of file
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
new file mode 100644
index 0000000..1192915
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.TimelineLayoutVersion;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.FSUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test class for {@link RepairsCommand}.
+ */
+public class TestRepairsCommand extends AbstractShellIntegrationTest {
+
+ private String tablePath;
+
+ @Before
+ public void init() throws IOException {
+ String tableName = "test_table";
+ tablePath = basePath + File.separator + tableName;
+
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+ "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+ }
+
+ /**
+ * Test case for dry run 'repair addpartitionmeta'.
+ */
+ @Test
+ public void testAddPartitionMetaWithDryRun() throws IOException {
+ // create commit instant
+ Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
+
+ // create partition path
+ String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+ String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+ String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+ assertTrue(fs.mkdirs(new Path(partition1)));
+ assertTrue(fs.mkdirs(new Path(partition2)));
+ assertTrue(fs.mkdirs(new Path(partition3)));
+
+ // default is dry run.
+ CommandResult cr = getShell().executeCommand("repair addpartitionmeta");
+ assertTrue(cr.isSuccess());
+
+ // expected all 'No'.
+ String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath)
+ .stream()
+ .map(partition -> new String[] {partition, "No", "None"})
+ .toArray(String[][]::new);
+ String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+ HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+
+ assertEquals(expected, cr.getResult().toString());
+ }
+
+ /**
+ * Test case for real run 'repair addpartitionmeta'.
+ */
+ @Test
+ public void testAddPartitionMetaWithRealRun() throws IOException {
+ // create commit instant
+ Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit"));
+
+ // create partition path
+ String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+ String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+ String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+ assertTrue(fs.mkdirs(new Path(partition1)));
+ assertTrue(fs.mkdirs(new Path(partition2)));
+ assertTrue(fs.mkdirs(new Path(partition3)));
+
+ CommandResult cr = getShell().executeCommand("repair addpartitionmeta --dryrun false");
+ assertTrue(cr.isSuccess());
+
+ List<String> paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath);
+ // after dry run, the action will be 'Repaired'
+ String[][] rows = paths.stream()
+ .map(partition -> new String[] {partition, "No", "Repaired"})
+ .toArray(String[][]::new);
+ String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+ HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+
+ assertEquals(expected, cr.getResult().toString());
+
+ cr = getShell().executeCommand("repair addpartitionmeta");
+
+ // after real run, Metadata is present now.
+ rows = paths.stream()
+ .map(partition -> new String[] {partition, "Yes", "None"})
+ .toArray(String[][]::new);
+ expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
+ HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
+ assertEquals(expected, cr.getResult().toString());
+ }
+
+ /**
+ * Test case for 'repair overwrite-hoodie-props'.
+ */
+ @Test
+ public void testOverwriteHoodieProperties() throws IOException {
+ URL newProps = this.getClass().getClassLoader().getResource("table-config.properties");
+ assertNotNull("New property file must exist", newProps);
+
+ CommandResult cr = getShell().executeCommand("repair overwrite-hoodie-props --new-props-file " + newProps.getPath());
+ assertTrue(cr.isSuccess());
+
+ Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().getProps();
+
+ // after overwrite, the stored value in .hoodie is equals to which read from properties.
+ Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
+ Properties expectProps = new Properties();
+ expectProps.load(new FileInputStream(new File(newProps.getPath())));
+
+ Map<String, String> expected = expectProps.entrySet().stream()
+ .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
+ assertEquals(expected, result);
+
+ // check result
+ List<String> allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type",
+ "hoodie.archivelog.folder", "hoodie.timeline.layout.version");
+ String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] {key,
+ oldProps.getOrDefault(key, null), result.getOrDefault(key, null)})
+ .toArray(String[][]::new);
+ String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
+ HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
+
+ assertEquals(expect, cr.getResult().toString());
+ }
+
+ /**
+ * Test case for 'repair corrupted clean files'.
+ */
+ @Test
+ public void testRemoveCorruptedPendingCleanAction() throws IOException {
+ HoodieCLI.conf = jsc.hadoopConfiguration();
+
+ Configuration conf = HoodieCLI.conf;
+
+ metaClient = HoodieCLI.getTableMetaClient();
+
+ // Create four requested files
+ for (int i = 100; i < 104; i++) {
+ String timestamp = String.valueOf(i);
+ // Write corrupted requested Compaction
+ HoodieTestCommitMetadataGenerator.createCompactionRequestedFile(tablePath, timestamp, conf);
+ }
+
+ // reload meta client
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ // first, there are four instants
+ assertEquals(4, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
+
+ CommandResult cr = getShell().executeCommand("repair corrupted clean files");
+ assertTrue(cr.isSuccess());
+
+ // reload meta client
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
+ }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index 99bea63..0ec3bc8 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -216,12 +216,23 @@ public class HoodieTestDataGenerator {
});
}
- public static void createCompactionRequestedFile(String basePath, String commitTime, Configuration configuration)
+ public static void createEmptyCleanRequestedFile(String basePath, String instantTime, Configuration configuration)
throws IOException {
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
- + HoodieTimeline.makeRequestedCompactionFileName(commitTime));
+ + HoodieTimeline.makeRequestedCleanerFileName(instantTime));
+ createEmptyFile(basePath, commitFile, configuration);
+ }
+
+ public static void createCompactionRequestedFile(String basePath, String instantTime, Configuration configuration)
+ throws IOException {
+ Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + HoodieTimeline.makeRequestedCompactionFileName(instantTime));
+ createEmptyFile(basePath, commitFile, configuration);
+ }
+
+ private static void createEmptyFile(String basePath, Path filePath, Configuration configuration) throws IOException {
FileSystem fs = FSUtils.getFs(basePath, configuration);
- FSDataOutputStream os = fs.create(commitFile, true);
+ FSDataOutputStream os = fs.create(filePath, true);
os.close();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
new file mode 100644
index 0000000..15e8bea
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class CollectionUtils {
+ /**
+ * Determines whether two iterators contain equal elements in the same order. More specifically,
+ * this method returns {@code true} if {@code iterator1} and {@code iterator2} contain the same
+ * number of elements and every element of {@code iterator1} is equal to the corresponding element
+ * of {@code iterator2}.
+ *
+ * <p>Note that this will modify the supplied iterators, since they will have been advanced some
+ * number of elements forward.
+ */
+ public static boolean elementsEqual(Iterator<?> iterator1, Iterator<?> iterator2) {
+ while (iterator1.hasNext()) {
+ if (!iterator2.hasNext()) {
+ return false;
+ }
+ Object o1 = iterator1.next();
+ Object o2 = iterator2.next();
+ if (!Objects.equals(o1, o2)) {
+ return false;
+ }
+ }
+ return !iterator2.hasNext();
+ }
+
+ @SafeVarargs
+ public static <T> Set<T> createSet(final T... elements) {
+ return Stream.of(elements).collect(Collectors.toSet());
+ }
+
+ public static <K,V> Map<K, V> createImmutableMap(final K key, final V value) {
+ return Collections.unmodifiableMap(Collections.singletonMap(key, value));
+ }
+
+ @SafeVarargs
+ public static <T> List<T> createImmutableList(final T... elements) {
+ return Collections.unmodifiableList(Stream.of(elements).collect(Collectors.toList()));
+ }
+
+ public static <K,V> Map<K,V> createImmutableMap(final Map<K,V> map) {
+ return Collections.unmodifiableMap(map);
+ }
+
+ @SafeVarargs
+ public static <K,V> Map<K,V> createImmutableMap(final Pair<K,V>... elements) {
+ Map<K,V> map = new HashMap<>();
+ for (Pair<K,V> pair: elements) {
+ map.put(pair.getLeft(), pair.getRight());
+ }
+ return Collections.unmodifiableMap(map);
+ }
+
+ @SafeVarargs
+ public static <T> Set<T> createImmutableSet(final T... elements) {
+ return Collections.unmodifiableSet(createSet(elements));
+ }
+
+ public static <T> Set<T> createImmutableSet(final Set<T> set) {
+ return Collections.unmodifiableSet(set);
+ }
+
+ public static <T> List<T> createImmutableList(final List<T> list) {
+ return Collections.unmodifiableList(list);
+ }
+
+ private static Object[] checkElementsNotNull(Object... array) {
+ return checkElementsNotNull(array, array.length);
+ }
+
+ private static Object[] checkElementsNotNull(Object[] array, int length) {
+ for (int i = 0; i < length; i++) {
+ checkElementNotNull(array[i], i);
+ }
+ return array;
+ }
+
+ private static Object checkElementNotNull(Object element, int index) {
+ return Objects.requireNonNull(element, "Element is null at index " + index);
+ }
+}
\ No newline at end of file