You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/11 21:23:31 UTC
[hudi] branch master updated: [HUDI-1502] MOR rollback and restore
support for metadata sync (#2421)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e3d3677 [HUDI-1502] MOR rollback and restore support for metadata sync (#2421)
e3d3677 is described below
commit e3d3677b7e7899705b624925666317f0c074f7c7
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Mon Jan 11 16:23:13 2021 -0500
[HUDI-1502] MOR rollback and restore support for metadata sync (#2421)
- Adds field to RollbackMetadata that capture the logs written for rollback blocks
- Adds field to RollbackMetadata that capture new logs files written by unsynced deltacommits
Co-authored-by: Vinoth Chandar <vi...@apache.org>
---
.../org/apache/hudi/io/HoodieAppendHandle.java | 1 +
.../AbstractMarkerBasedRollbackStrategy.java | 26 +++--
.../BaseMergeOnReadRollbackActionExecutor.java | 3 +-
.../hudi/table/action/rollback/RollbackUtils.java | 6 +-
.../rollback/ListingBasedRollbackHelper.java | 21 +++-
.../rollback/SparkMarkerBasedRollbackStrategy.java | 14 +++
.../hudi/metadata/TestHoodieBackedMetadata.java | 43 ++++---
.../rollback/TestMarkerBasedRollbackStrategy.java | 130 +++++++++++++++------
.../src/main/avro/HoodieRollbackMetadata.avsc | 16 ++-
.../org/apache/hudi/common/HoodieRollbackStat.java | 20 +++-
.../java/org/apache/hudi/common/fs/FSUtils.java | 13 ++-
.../table/timeline/TimelineMetadataUtils.java | 28 ++---
.../hudi/metadata/HoodieTableMetadataUtil.java | 34 ++++--
.../hudi/common/table/TestTimelineUtils.java | 27 ++---
.../table/view/TestIncrementalFSViewSync.java | 2 +-
15 files changed, 268 insertions(+), 116 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 3faac2e..c6ea7ba 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -164,6 +164,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
// Since the actual log file written to can be different based on when rollover happens, we use the
// base file to denote some log appends happened on a slice. writeToken will still fence concurrent
// writers.
+ // https://issues.apache.org/jira/browse/HUDI-1517
createMarkerFile(partitionPath, FSUtils.makeDataFileName(baseInstantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()));
this.writer = createLogWriter(fileSlice, baseInstantTime);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
index cb6fff9..cc596ba 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
@@ -53,9 +53,9 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
protected final HoodieWriteConfig config;
- private final String basePath;
+ protected final String basePath;
- private final String instantTime;
+ protected final String instantTime;
public AbstractMarkerBasedRollbackStrategy(HoodieTable<T, I, K, O> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
this.table = table;
@@ -90,6 +90,7 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName());
String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
+ final Map<FileStatus, Long> writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
HoodieLogFormat.Writer writer = null;
try {
@@ -121,17 +122,26 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
}
}
- Map<FileStatus, Long> filesToNumBlocksRollback = Collections.emptyMap();
- if (config.useFileListingMetadata()) {
- // When metadata is enabled, the information of files appended to is required
- filesToNumBlocksRollback = Collections.singletonMap(
+ // the information of files appended to is required for metadata sync
+ Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
1L);
- }
return HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath)
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .build();
+ .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build();
+ }
+
+ /**
+ * Returns written log file size map for the respective baseCommitTime to assist in metadata table syncing.
+ * @param partitionPath partition path of interest
+ * @param baseCommitTime base commit time of interest
+ * @param fileId fileId of interest
+ * @return Map<FileStatus, File size>
+ * @throws IOException
+ */
+ protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPath, String baseCommitTime, String fileId) throws IOException {
+ return Collections.EMPTY_MAP;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java
index 44cd5c8..2e75144 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java
@@ -28,7 +28,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -57,7 +56,7 @@ public abstract class BaseMergeOnReadRollbackActionExecutor<T extends HoodieReco
}
@Override
- protected List<HoodieRollbackStat> executeRollback() throws IOException {
+ protected List<HoodieRollbackStat> executeRollback() {
HoodieTimer rollbackTimer = new HoodieTimer();
rollbackTimer.startTimer();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index 1ca0196..8ddb7e9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -74,14 +74,16 @@ public class RollbackUtils {
final List<String> successDeleteFiles = new ArrayList<>();
final List<String> failedDeleteFiles = new ArrayList<>();
final Map<FileStatus, Long> commandBlocksCount = new HashMap<>();
- final List<FileStatus> filesToRollback = new ArrayList<>();
+ final Map<FileStatus, Long> writtenLogFileSizeMap = new HashMap<>();
Option.ofNullable(stat1.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll);
Option.ofNullable(stat2.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll);
Option.ofNullable(stat1.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll);
Option.ofNullable(stat2.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll);
Option.ofNullable(stat1.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll);
Option.ofNullable(stat2.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll);
- return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount);
+ Option.ofNullable(stat1.getWrittenLogFileSizeMap()).ifPresent(writtenLogFileSizeMap::putAll);
+ Option.ofNullable(stat2.getWrittenLogFileSizeMap()).ifPresent(writtenLogFileSizeMap::putAll);
+ return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount, writtenLogFileSizeMap);
}
/**
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
index a614aeb..fcb3882 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -49,6 +50,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import scala.Tuple2;
@@ -116,12 +118,22 @@ public class ListingBasedRollbackHelper implements Serializable {
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
+ String fileId = rollbackRequest.getFileId().get();
+ String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
+
+ // collect all log files that is supposed to be deleted with this rollback
+ Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
+ FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
+ fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
+ .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
+
Writer writer = null;
try {
writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
- .withFileId(rollbackRequest.getFileId().get())
- .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())
+ .withFileId(fileId)
+ .overBaseCommit(latestBaseInstant)
+ .withFs(metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
// generate metadata
@@ -149,9 +161,11 @@ public class ListingBasedRollbackHelper implements Serializable {
metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
1L
);
+
return new Tuple2<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
- .withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
+ .withRollbackBlockAppendResults(filesToNumBlocksRollback)
+ .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
}
default:
throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
@@ -159,7 +173,6 @@ public class ListingBasedRollbackHelper implements Serializable {
});
}
-
/**
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
*/
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
index 0598e54..0f36cb8 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
@@ -22,7 +22,10 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
@@ -32,10 +35,14 @@ import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import scala.Tuple2;
@@ -75,4 +82,11 @@ public class SparkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> ext
throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
}
}
+
+ protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
+ // collect all log files that is supposed to be deleted with this rollback
+ return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
+ FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
+ .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
+ }
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index 7e3dea4..3d770c7 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -80,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
+
private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class);
@TempDir
@@ -261,13 +262,11 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
/**
* Test rollback of various table operations sync to Metadata Table correctly.
*/
- //@ParameterizedTest
- //@EnumSource(HoodieTableType.class)
- //public void testRollbackOperations(HoodieTableType tableType) throws Exception {
- @Test
- public void testRollbackOperations() throws Exception {
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testRollbackOperations(HoodieTableType tableType) throws Exception {
//FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
- init(HoodieTableType.COPY_ON_WRITE);
+ init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
@@ -371,13 +370,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
}
/**
- * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op
- * occurs to metadata.
- * @throws Exception
+ * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op occurs to metadata.
+ * Once explicit sync is called, metadata should match.
*/
- @Test
- public void testRollbackUnsyncedCommit() throws Exception {
- init(HoodieTableType.COPY_ON_WRITE);
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testRollbackUnsyncedCommit(HoodieTableType tableType) throws Exception {
+ init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
@@ -389,7 +388,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
}
-
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
// Commit with metadata disabled
@@ -401,6 +399,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
}
try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true))) {
+ assertFalse(metadata(client).isInSync());
+ client.syncTableMetadata();
validateMetadata(client);
}
}
@@ -528,8 +528,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
}
/**
- * Instants on Metadata Table should be archived as per config.
- * Metadata Table should be automatically compacted as per config.
+ * Instants on Metadata Table should be archived as per config. Metadata Table should be automatically compacted as per config.
*/
@Test
public void testCleaningArchivingAndCompaction() throws Exception {
@@ -752,8 +751,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
/**
* Validate the metadata tables contents to ensure it matches what is on the file system.
- *
- * @throws IOException
*/
private void validateMetadata(SparkRDDWriteClient client) throws IOException {
HoodieWriteConfig config = client.getConfig();
@@ -807,7 +804,19 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
+
+ for (String fileName : fsFileNames) {
+ if (!metadataFilenames.contains(fileName)) {
+ LOG.error(partition + "FsFilename " + fileName + " not found in Meta data");
+ }
+ }
+ for (String fileName : metadataFilenames) {
+ if (!fsFileNames.contains(fileName)) {
+ LOG.error(partition + "Metadata file " + fileName + " not found in original FS");
+ }
+ }
}
+
assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match");
assertTrue(fsFileNames.equals(metadataFilenames), "Files within partition " + partition + " should match");
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
index 7acff79..83e2b05 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
@@ -18,20 +18,33 @@
package org.apache.hudi.table.action.rollback;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
@@ -40,13 +53,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
+ private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with listing metadata enable={0}";
+
+ public static Stream<Arguments> configParams() {
+ return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of);
+ }
+
+ private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+
@BeforeEach
public void setUp() throws Exception {
initPath();
initSparkContexts();
initFileSystem();
- initMetaClient();
- initDFS();
+ initMetaClient(tableType);
+ initTestDataGenerator();
}
@AfterEach
@@ -55,7 +76,7 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
}
@Test
- public void testCopyOnWriteRollback() throws Exception {
+ public void testCopyOnWriteRollbackWithTestTable() throws Exception {
// given: wrote some base files and corresponding markers
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f0 = testTable.addRequestedCommit("000")
@@ -85,43 +106,78 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
}
- @Test
- public void testMergeOnReadRollback() throws Exception {
- // given: wrote some base + log files and corresponding markers
- HoodieTestTable testTable = HoodieTestTable.of(metaClient);
- String f2 = testTable.addRequestedDeltaCommit("000")
- .getFileIdsWithBaseFilesInPartitions("partA").get("partA");
- String f1 = testTable.addDeltaCommit("001")
- .withLogFile("partA", f2)
- .getFileIdsWithBaseFilesInPartitions("partB").get("partB");
- String f3 = "f3";
- String f4 = "f4";
- testTable.forDeltaCommit("001")
- .withMarkerFile("partB", f1, IOType.CREATE)
- .withMarkerFile("partA", f3, IOType.CREATE)
- .withMarkerFile("partA", f2, IOType.APPEND)
- .withMarkerFile("partB", f4, IOType.APPEND);
+ @Tag("functional")
+ @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
+ @MethodSource("configParams")
+ public void testCopyOnWriteRollback(boolean useFileListingMetadata) throws Exception {
+ HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
+ .withPath(basePath).build();
+
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, writeConfig)) {
+ // rollback 2nd commit and ensure stats reflect the info.
+ List<HoodieRollbackStat> stats = testRun(useFileListingMetadata, writeConfig, writeClient);
+
+ assertEquals(3, stats.size());
+ for (HoodieRollbackStat stat : stats) {
+ assertEquals(1, stat.getSuccessDeleteFiles().size());
+ assertEquals(0, stat.getFailedDeleteFiles().size());
+ assertEquals(0, stat.getCommandBlocksCount().size());
+ assertEquals(0, stat.getWrittenLogFileSizeMap().size());
+ }
+ }
+ }
- // when
- List<HoodieRollbackStat> stats = new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002")
- .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "001"));
+ @Tag("functional")
+ @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
+ @MethodSource("configParams")
+ public void testMergeOnReadRollback(boolean useFileListingMetadata) throws Exception {
+ // init MERGE_ON_READ_TABLE
+ tearDown();
+ tableType = HoodieTableType.MERGE_ON_READ;
+ setUp();
+
+ HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
+ .withPath(basePath).build();
+
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, writeConfig)) {
+
+ // rollback 2nd commit and ensure stats reflect the info.
+ List<HoodieRollbackStat> stats = testRun(useFileListingMetadata, writeConfig, writeClient);
+
+ assertEquals(3, stats.size());
+ for (HoodieRollbackStat stat : stats) {
+ assertEquals(0, stat.getSuccessDeleteFiles().size());
+ assertEquals(0, stat.getFailedDeleteFiles().size());
+ assertEquals(1, stat.getCommandBlocksCount().size());
+ stat.getCommandBlocksCount().forEach((fileStatus, len) -> assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())));
+ assertEquals(1, stat.getWrittenLogFileSizeMap().size());
+ stat.getWrittenLogFileSizeMap().forEach((fileStatus, len) -> assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())));
+ }
+ }
+ }
- // then: ensure files are deleted, rollback block is appended (even if append does not exist)
- assertEquals(2, stats.size());
- // will have the log file
- FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB");
- assertEquals(1, partBFiles.length);
- assertTrue(partBFiles[0].getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
- assertTrue(partBFiles[0].getLen() > 0);
+ private List<HoodieRollbackStat> testRun(boolean useFileListingMetadata, HoodieWriteConfig writeConfig, SparkRDDWriteClient writeClient) {
+ String newCommitTime = "001";
+ writeClient.startCommitWithTime(newCommitTime);
- FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA");
- assertEquals(3, partAFiles.length);
- assertEquals(2, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
- assertEquals(1, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count());
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+ JavaRDD<WriteStatus> writeStatuses = writeClient.insert(jsc.parallelize(records, 1), newCommitTime);
+ writeClient.commit(newCommitTime, writeStatuses);
- // only partB/f1_001 will be deleted
- assertEquals(1, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum());
- // partA/f3_001 is non existent
- assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
+ // Updates
+ newCommitTime = "002";
+ writeClient.startCommitWithTime(newCommitTime);
+ records = dataGen.generateUniqueUpdates(newCommitTime, 50);
+ writeStatuses = writeClient.upsert(jsc.parallelize(records, 1), newCommitTime);
+ writeStatuses.collect();
+
+ // rollback 2nd commit and ensure stats reflect the info.
+ return new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003")
+ .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"));
}
+
}
diff --git a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
index 069881e..a972bfd 100644
--- a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
@@ -31,18 +31,24 @@
{"name": "partitionPath", "type": "string"},
{"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}},
{"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}},
- {"name": "appendFiles", "type": {
+ {"name": "rollbackLogFiles", "type": {
"type": "map",
- "doc": "Files to which append blocks were written",
+ "doc": "Files to which append blocks were written to capture rollback commit",
"values": {
"type": "long",
"doc": "Size of this file in bytes"
}
+ }},
+ {"name": "writtenLogFiles", "type": {
+ "type": "map",
+ "doc": "Log files written that were expected to be rolledback",
+ "values": {
+ "type": "long",
+ "doc": "Size of this file in bytes"
+ }
}}
]
- }
- }
- },
+ }}},
{
"name":"version",
"type":["int", "null"],
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java
index a3191fa..3e4ee34 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java
@@ -38,13 +38,16 @@ public class HoodieRollbackStat implements Serializable {
private final List<String> failedDeleteFiles;
// Count of HoodieLogFile to commandBlocks written for a particular rollback
private final Map<FileStatus, Long> commandBlocksCount;
+ // all log files with same base instant as instant to be rolledback
+ private final Map<FileStatus, Long> writtenLogFileSizeMap;
public HoodieRollbackStat(String partitionPath, List<String> successDeleteFiles, List<String> failedDeleteFiles,
- Map<FileStatus, Long> commandBlocksCount) {
+ Map<FileStatus, Long> commandBlocksCount, Map<FileStatus, Long> writtenLogFileSizeMap) {
this.partitionPath = partitionPath;
this.successDeleteFiles = successDeleteFiles;
this.failedDeleteFiles = failedDeleteFiles;
this.commandBlocksCount = commandBlocksCount;
+ this.writtenLogFileSizeMap = writtenLogFileSizeMap;
}
public Map<FileStatus, Long> getCommandBlocksCount() {
@@ -63,6 +66,10 @@ public class HoodieRollbackStat implements Serializable {
return failedDeleteFiles;
}
+ public Map<FileStatus, Long> getWrittenLogFileSizeMap() {
+ return writtenLogFileSizeMap;
+ }
+
public static HoodieRollbackStat.Builder newBuilder() {
return new Builder();
}
@@ -75,6 +82,7 @@ public class HoodieRollbackStat implements Serializable {
private List<String> successDeleteFiles;
private List<String> failedDeleteFiles;
private Map<FileStatus, Long> commandBlocksCount;
+ private Map<FileStatus, Long> writtenLogFileSizeMap;
private String partitionPath;
public Builder withDeletedFileResults(Map<FileStatus, Boolean> deletedFiles) {
@@ -100,6 +108,11 @@ public class HoodieRollbackStat implements Serializable {
return this;
}
+ public Builder withWrittenLogFileSizeMap(Map<FileStatus, Long> writtenLogFileSizeMap) {
+ this.writtenLogFileSizeMap = writtenLogFileSizeMap;
+ return this;
+ }
+
public Builder withPartitionPath(String partitionPath) {
this.partitionPath = partitionPath;
return this;
@@ -115,7 +128,10 @@ public class HoodieRollbackStat implements Serializable {
if (commandBlocksCount == null) {
commandBlocksCount = Collections.EMPTY_MAP;
}
- return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount);
+ if (writtenLogFileSizeMap == null) {
+ writtenLogFileSizeMap = Collections.EMPTY_MAP;
+ }
+ return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount, writtenLogFileSizeMap);
}
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 7978eed..1990c0a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -45,6 +45,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -426,10 +427,14 @@ public class FSUtils {
*/
public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path partitionPath, final String fileId,
final String logFileExtension, final String baseCommitTime) throws IOException {
- return Arrays
- .stream(fs.listStatus(partitionPath,
- path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension)))
- .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime));
+ try {
+ return Arrays
+ .stream(fs.listStatus(partitionPath,
+ path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension)))
+ .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime));
+ } catch (FileNotFoundException e) {
+ return Stream.<HoodieLogFile>builder().build();
+ }
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index 962d69d..9b419ca 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -18,17 +18,6 @@
package org.apache.hudi.common.table.timeline;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableByteArrayInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.hadoop.fs.FileStatus;
-
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -44,6 +33,17 @@ import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableByteArrayInput;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.fs.FileStatus;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
@@ -71,10 +71,12 @@ public class TimelineMetadataUtils {
Map<String, HoodieRollbackPartitionMetadata> partitionMetadataBuilder = new HashMap<>();
int totalDeleted = 0;
for (HoodieRollbackStat stat : rollbackStats) {
- Map<String, Long> appendFiles = stat.getCommandBlocksCount().keySet().stream()
+ Map<String, Long> rollbackLogFiles = stat.getCommandBlocksCount().keySet().stream()
+ .collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen));
+ Map<String, Long> probableLogFiles = stat.getWrittenLogFileSizeMap().keySet().stream()
.collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen));
HoodieRollbackPartitionMetadata metadata = new HoodieRollbackPartitionMetadata(stat.getPartitionPath(),
- stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), appendFiles);
+ stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), rollbackLogFiles, probableLogFiles);
partitionMetadataBuilder.put(stat.getPartitionPath(), metadata);
totalDeleted += stat.getSuccessDeleteFiles().size();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index ed2a878..5942312 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -18,7 +18,6 @@
package org.apache.hudi.metadata;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -34,6 +33,8 @@ import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -44,6 +45,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
@@ -246,12 +248,13 @@ public class HoodieTableMetadataUtil {
rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
// Has this rollback produced new files?
- boolean hasAppendFiles = pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
+ boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && !pm.getRollbackLogFiles().isEmpty();
+ boolean hasNonZeroRollbackLogFiles = hasRollbackLogFiles && pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
// If commit being rolled back has not been synced to metadata table yet then there is no need to update metadata
boolean shouldSkip = lastSyncTs.isPresent()
&& HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), HoodieTimeline.GREATER_THAN, lastSyncTs.get());
- if (!hasAppendFiles && shouldSkip) {
+ if (!hasNonZeroRollbackLogFiles && shouldSkip) {
LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, given metadata table is already synced upto to %s",
rollbackMetadata.getCommitsRollback().get(0), lastSyncTs.get()));
return;
@@ -269,16 +272,31 @@ public class HoodieTableMetadataUtil {
partitionToDeletedFiles.get(partition).addAll(deletedFiles);
}
- if (!pm.getAppendFiles().isEmpty()) {
+ BiFunction<Long, Long, Long> fileMergeFn = (oldSize, newSizeCopy) -> {
+ // if a file exists in both written log files and rollback log files, we want to pick the one that is higher
+ // as rollback file could have been updated after written log files are computed.
+ return oldSize > newSizeCopy ? oldSize : newSizeCopy;
+ };
+
+ if (hasRollbackLogFiles) {
if (!partitionToAppendedFiles.containsKey(partition)) {
partitionToAppendedFiles.put(partition, new HashMap<>());
}
// Extract appended file name from the absolute paths saved in getAppendFiles()
- pm.getAppendFiles().forEach((path, size) -> {
- partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> {
- return size + oldSize;
- });
+ pm.getRollbackLogFiles().forEach((path, size) -> {
+ partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn);
+ });
+ }
+
+ if (pm.getWrittenLogFiles() != null && !pm.getWrittenLogFiles().isEmpty()) {
+ if (!partitionToAppendedFiles.containsKey(partition)) {
+ partitionToAppendedFiles.put(partition, new HashMap<>());
+ }
+
+ // Extract appended file name from the absolute paths saved in getWrittenLogFiles()
+ pm.getWrittenLogFiles().forEach((path, size) -> {
+ partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn);
});
}
});
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index b79ffbb..18c0d3f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -72,8 +72,8 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, ts1);
activeTimeline.createNewInstant(instant1);
// create replace metadata only with replaced file Ids (no new files created)
- activeTimeline.saveAsComplete(instant1,
- Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition,2, newFilePartition,0, Collections.emptyMap())));
+ activeTimeline.saveAsComplete(instant1,
+ Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2, newFilePartition, 0, Collections.emptyMap())));
metaClient.reloadActiveTimeline();
List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0", 10));
@@ -85,7 +85,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
activeTimeline.createNewInstant(instant2);
// create replace metadata only with replaced file Ids (no new files created)
activeTimeline.saveAsComplete(instant2,
- Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition,0, newFilePartition,3, Collections.emptyMap())));
+ Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0, newFilePartition, 3, Collections.emptyMap())));
metaClient.reloadActiveTimeline();
partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
assertEquals(1, partitions.size());
@@ -96,7 +96,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
assertTrue(partitions.contains(replacePartition));
assertTrue(partitions.contains(newFilePartition));
}
-
+
@Test
public void testGetPartitions() throws IOException {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
@@ -120,20 +120,20 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
// verify modified partitions included cleaned data
List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
assertEquals(5, partitions.size());
- assertEquals(partitions, Arrays.asList(new String[]{"0", "2", "3", "4", "5"}));
+ assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4", "5"}));
partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
assertEquals(4, partitions.size());
- assertEquals(partitions, Arrays.asList(new String[]{"0", "2", "3", "4"}));
+ assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4"}));
// verify only commit actions
partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
assertEquals(4, partitions.size());
- assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"}));
+ assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));
partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
assertEquals(3, partitions.size());
- assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
+ assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
}
@Test
@@ -181,10 +181,10 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
// verify modified partitions included cleaned data
List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
- assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"}));
+ assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));
partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
- assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
+ assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
}
@Test
@@ -201,7 +201,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
- ts = "1";
+ ts = "1";
instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
activeTimeline.createNewInstant(instant);
Map<String, String> extraMetadata = new HashMap<>();
@@ -241,7 +241,8 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
List<HoodieInstant> rollbacks = new ArrayList<>();
rollbacks.add(new HoodieInstant(false, actionType, commitTs));
- HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap());
+ HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap(),
+ Collections.EMPTY_MAP);
List<HoodieRollbackStat> rollbackStats = new ArrayList<>();
rollbackStats.add(rollbackStat);
return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats);
@@ -264,7 +265,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
}
private byte[] getReplaceCommitMetadata(String basePath, String commitTs, String replacePartition, int replaceCount,
- String newFilePartition, int newFileCount, Map<String, String> extraMetadata)
+ String newFilePartition, int newFileCount, Map<String, String> extraMetadata)
throws IOException {
HoodieReplaceCommitMetadata commit = new HoodieReplaceCommitMetadata();
for (int i = 1; i <= newFileCount; i++) {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
index 5400dc4..146e0bb 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
@@ -556,7 +556,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
boolean isRestore) throws IOException {
Map<String, List<String>> partititonToFiles = deleteFiles(files);
List<HoodieRollbackStat> rollbackStats = partititonToFiles.entrySet().stream().map(e ->
- new HoodieRollbackStat(e.getKey(), e.getValue(), new ArrayList<>(), new HashMap<>())
+ new HoodieRollbackStat(e.getKey(), e.getValue(), new ArrayList<>(), new HashMap<>(), new HashMap<>())
).collect(Collectors.toList());
List<HoodieInstant> rollbacks = new ArrayList<>();