You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/10/25 02:34:14 UTC
[incubator-hudi] branch master updated: [HUDI-169] Speed up rolling
back of instants (#968)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c23da69 [HUDI-169] Speed up rolling back of instants (#968)
c23da69 is described below
commit c23da694ccbbf1f0ebcc5ed770dd56c74c1e06d7
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Thu Oct 24 19:34:00 2019 -0700
[HUDI-169] Speed up rolling back of instants (#968)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 13 +
.../apache/hudi/table/HoodieCopyOnWriteTable.java | 70 ++---
.../apache/hudi/table/HoodieMergeOnReadTable.java | 328 +++++++++------------
.../org/apache/hudi/table/RollbackExecutor.java | 233 +++++++++++++++
.../org/apache/hudi/table/RollbackRequest.java | 109 +++++++
5 files changed, 506 insertions(+), 247 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3597556..16b223c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -51,6 +51,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
+ private static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
+ private static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
private static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4 * 1024 * 1024);
private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert";
@@ -141,6 +143,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(UPSERT_PARALLELISM));
}
+ public int getRollbackParallelism() {
+ return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
+ }
+
+
public int getWriteBufferLimitBytes() {
return Integer.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES));
}
@@ -562,6 +569,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder withRollbackParallelism(int rollbackParallelism) {
+ props.setProperty(ROLLBACK_PARALLELISM, String.valueOf(rollbackParallelism));
+ return this;
+ }
+
public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
props.setProperty(WRITE_BUFFER_LIMIT_BYTES, String.valueOf(writeBufferLimit));
return this;
@@ -651,6 +663,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM,
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
+ setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP,
DEFAULT_COMBINE_BEFORE_INSERT);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_UPSERT_PROP), COMBINE_BEFORE_UPSERT_PROP,
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 6644f14..e18578d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -32,10 +32,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieCleanStat;
@@ -74,7 +72,6 @@ import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
@@ -294,45 +291,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
}
- /**
- * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
- */
- protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String partitionPath,
- PathFilter filter) throws IOException {
- logger.info("Cleaning path " + partitionPath);
- FileSystem fs = getMetaClient().getFs();
- FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
- for (FileStatus file : toBeDeleted) {
- boolean success = fs.delete(file.getPath(), false);
- results.put(file, success);
- logger.info("Delete file " + file.getPath() + "\t" + success);
- }
- return results;
- }
-
- /**
- * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
- */
- protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String commit,
- String partitionPath) throws IOException {
- logger.info("Cleaning path " + partitionPath);
- FileSystem fs = getMetaClient().getFs();
- PathFilter filter = (path) -> {
- if (path.toString().contains(".parquet")) {
- String fileCommitTime = FSUtils.getCommitTime(path.getName());
- return commit.equals(fileCommitTime);
- }
- return false;
- };
- FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
- for (FileStatus file : toBeDeleted) {
- boolean success = fs.delete(file.getPath(), false);
- results.put(file, success);
- logger.info("Delete file " + file.getPath() + "\t" + success);
- }
- return results;
- }
-
@Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
throws IOException {
@@ -342,30 +300,38 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
this.getInflightCommitTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
// Atomically unpublish the commits
if (!inflights.contains(commit)) {
+ logger.info("Unpublishing " + commit);
activeTimeline.revertToInflight(new HoodieInstant(false, actionType, commit));
}
- logger.info("Unpublished " + commit);
+
+ HoodieInstant instantToRollback = new HoodieInstant(false, actionType, commit);
+ Long startTime = System.currentTimeMillis();
// delete all the data files for this commit
logger.info("Clean out all parquet files generated for commit: " + commit);
+ List<RollbackRequest> rollbackRequests = generateRollbackRequests(instantToRollback);
+
+ //TODO: We need to persist this as rollback workload and use it in case of partial failures
List<HoodieRollbackStat> stats =
- jsc.parallelize(FSUtils.getAllPartitionPaths(metaClient.getFs(), getMetaClient().getBasePath(),
- config.shouldAssumeDatePartitioning())).map((Function<String, HoodieRollbackStat>) partitionPath -> {
- // Scan all partitions files with this commit time
- final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
- deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath);
- return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
- .withDeletedFileResults(filesToDeletedStatus).build();
- }).collect();
+ new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests);
// Delete Inflight instant if enabled
deleteInflightInstant(deleteInstants, activeTimeline, new HoodieInstant(true, actionType, commit));
+ logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return stats;
}
+ private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback)
+ throws IOException {
+ return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
+ config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> {
+ return RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback);
+ }).collect(Collectors.toList());
+ }
+
/**
* Delete Inflight instant if enabled
- *
+ *
* @param deleteInstant Enable Deletion of Inflight instant
* @param activeTimeline Hoodie active timeline
* @param instantToBeDeleted Instant to be deleted
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index f9b4141..8dcb3bf 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -19,22 +19,17 @@
package org.apache.hudi.table;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieRollbackStat;
@@ -47,11 +42,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.FSUtils;
@@ -59,10 +49,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.func.MergeOnReadLazyInsertIterable;
-import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.HoodieAppendHandle;
import org.apache.hudi.io.compact.HoodieRealtimeTableCompactor;
import org.apache.log4j.LogManager;
@@ -70,7 +58,6 @@ import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
/**
* Implementation of a more real-time read-optimized Hoodie Table where
@@ -180,7 +167,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
@Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
throws IOException {
-
// At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental
// feature that is expensive. To perform nested rollbacks, initiate multiple requests of client.rollback
// (commitToRollback).
@@ -198,140 +184,121 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
}
logger.info("Unpublished " + commit);
Long startTime = System.currentTimeMillis();
+ List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instantToRollback);
+ //TODO: We need to persist this as rollback workload and use it in case of partial failures
List<HoodieRollbackStat> allRollbackStats =
- jsc.parallelize(FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
- config.shouldAssumeDatePartitioning())).map((Function<String, HoodieRollbackStat>) partitionPath -> {
- HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload();
- HoodieRollbackStat hoodieRollbackStats = null;
- // Need to put the path filter here since Filter is not serializable
- // PathFilter to get all parquet files and log files that need to be deleted
- PathFilter filter = (path) -> {
- if (path.toString().contains(".parquet")) {
- String fileCommitTime = FSUtils.getCommitTime(path.getName());
- return commit.equals(fileCommitTime);
- } else if (path.toString().contains(".log")) {
- // Since the baseCommitTime is the only commit for new log files, it's okay here
- String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
- return commit.equals(fileCommitTime);
- }
- return false;
- };
-
- final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
-
- switch (instantToRollback.getAction()) {
- case HoodieTimeline.COMMIT_ACTION:
- try {
- // Rollback of a commit should delete the newly created parquet files along with any log
- // files created with this as baseCommit. This is required to support multi-rollbacks in a MOR
- // table.
- super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
- hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
- .withDeletedFileResults(filesToDeletedStatus).build();
- break;
- } catch (IOException io) {
- throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
- }
- case HoodieTimeline.COMPACTION_ACTION:
- try {
- // If there is no delta commit present after the current commit (if compaction), no action, else we
- // need to make sure that a compaction commit rollback also deletes any log files written as part of
- // the
- // succeeding deltacommit.
- boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants()
- .findInstantsAfter(commit, 1).empty();
- if (higherDeltaCommits) {
- // Rollback of a compaction action with no higher deltacommit means that the compaction is
- // scheduled
- // and has not yet finished. In this scenario we should delete only the newly created parquet
- // files
- // and not corresponding base commit log files created with this as baseCommit since updates would
- // have been written to the log files.
- super.deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath);
- hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
- .withDeletedFileResults(filesToDeletedStatus).build();
- } else {
- // No deltacommits present after this compaction commit (inflight or requested). In this case, we
- // can also delete any log files that were created with this compaction commit as base
- // commit.
- super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
- hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
- .withDeletedFileResults(filesToDeletedStatus).build();
- }
- break;
- } catch (IOException io) {
- throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
- }
- case HoodieTimeline.DELTA_COMMIT_ACTION:
- // --------------------------------------------------------------------------------------------------
- // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
- // --------------------------------------------------------------------------------------------------
- // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries.
- // In
- // this scenario we would want to delete these log files.
- // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
- // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
- // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
- // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime
- // and
- // and hence will end up deleting these log files. This is done so there are no orphan log files
- // lying around.
- // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
- // taken in this scenario is a combination of (A.2) and (A.3)
- // ---------------------------------------------------------------------------------------------------
- // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
- // ---------------------------------------------------------------------------------------------------
- // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no
- // entries.
- // In this scenario, we delete all the parquet files written for the failed commit.
- // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
- // this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
- // (B.3) Rollback triggered for first commit - Same as (B.1)
- // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
- // as well if the base parquet file gets deleted.
- try {
- HoodieCommitMetadata commitMetadata =
- HoodieCommitMetadata.fromBytes(
- metaClient.getCommitTimeline().getInstantDetails(new HoodieInstant(true,
- instantToRollback.getAction(), instantToRollback.getTimestamp())).get(),
- HoodieCommitMetadata.class);
-
- // read commit file and (either append delete blocks or delete file)
- Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
-
- // In case all data was inserts and the commit failed, delete the file belonging to that commit
- // We do not know fileIds for inserts (first inserts are either log files or parquet files),
- // delete all files for the corresponding failed commit, if present (same as COW)
- super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
- final Set<String> deletedFiles = filesToDeletedStatus.entrySet().stream().map(entry -> {
- Path filePath = entry.getKey().getPath();
- return FSUtils.getFileIdFromFilePath(filePath);
- }).collect(Collectors.toSet());
-
- // append rollback blocks for updates
- if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
- hoodieRollbackStats = rollback(index, partitionPath, commit, commitMetadata, filesToDeletedStatus,
- filesToNumBlocksRollback, deletedFiles);
- }
- break;
- } catch (IOException io) {
- throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
- }
- default:
- break;
- }
- return hoodieRollbackStats;
- }).filter(Objects::nonNull).collect();
-
+ new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests);
// Delete Inflight instants if enabled
- deleteInflightInstant(deleteInstants, this.getActiveTimeline(),
- new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()));
+ deleteInflightInstant(deleteInstants, this.getActiveTimeline(), new HoodieInstant(true, instantToRollback
+ .getAction(), instantToRollback.getTimestamp()));
- logger.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
+ logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return allRollbackStats;
}
+ /**
+ * Generate all rollback requests that we need to perform for rolling back this action without actually performing
+ * rolling back
+ * @param jsc JavaSparkContext
+ * @param instantToRollback Instant to Rollback
+ * @return list of rollback requests
+ * @throws IOException
+ */
+ private List<RollbackRequest> generateRollbackRequests(JavaSparkContext jsc, HoodieInstant instantToRollback)
+ throws IOException {
+ String commit = instantToRollback.getTimestamp();
+ List<String> partitions = FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
+ config.shouldAssumeDatePartitioning());
+ int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
+ return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions))
+ .flatMap(partitionPath -> {
+ HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload();
+ List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();
+ switch (instantToRollback.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ logger.info("Rolling back commit action. There are higher delta commits. So only rolling back this "
+ + "instant");
+ partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(
+ partitionPath, instantToRollback));
+ break;
+ case HoodieTimeline.COMPACTION_ACTION:
+ // If there is no delta commit present after the current commit (if compaction), no action, else we
+ // need to make sure that a compaction commit rollback also deletes any log files written as part of the
+ // succeeding deltacommit.
+ boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline()
+ .filterCompletedInstants().findInstantsAfter(commit, 1).empty();
+ if (higherDeltaCommits) {
+ // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
+ // and has not yet finished. In this scenario we should delete only the newly created parquet files
+ // and not corresponding base commit log files created with this as baseCommit since updates would
+ // have been written to the log files.
+ logger.info("Rolling back compaction. There are higher delta commits. So only deleting data files");
+ partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(
+ partitionPath, instantToRollback));
+ } else {
+ // No deltacommits present after this compaction commit (inflight or requested). In this case, we
+ // can also delete any log files that were created with this compaction commit as base
+ // commit.
+ logger.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and"
+ + " log files");
+ partitionRollbackRequests.add(
+ RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath,
+ instantToRollback));
+ }
+ break;
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ // --------------------------------------------------------------------------------------------------
+ // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
+ // --------------------------------------------------------------------------------------------------
+ // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
+ // this scenario we would want to delete these log files.
+ // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
+ // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
+ // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
+ // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
+ // and hence will end up deleting these log files. This is done so there are no orphan log files
+ // lying around.
+ // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
+ // taken in this scenario is a combination of (A.2) and (A.3)
+ // ---------------------------------------------------------------------------------------------------
+ // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
+ // ---------------------------------------------------------------------------------------------------
+ // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries.
+ // In this scenario, we delete all the parquet files written for the failed commit.
+ // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
+ // this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
+ // (B.3) Rollback triggered for first commit - Same as (B.1)
+ // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
+ // as well if the base parquet file gets deleted.
+ try {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+ metaClient.getCommitTimeline().getInstantDetails(
+ new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()))
+ .get(), HoodieCommitMetadata.class);
+
+ // In case all data was inserts and the commit failed, delete the file belonging to that commit
+ // We do not know fileIds for inserts (first inserts are either log files or parquet files),
+ // delete all files for the corresponding failed commit, if present (same as COW)
+ partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(
+ partitionPath, instantToRollback));
+
+ // append rollback blocks for updates
+ if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
+ partitionRollbackRequests
+ .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata));
+ }
+ break;
+ } catch (IOException io) {
+ throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io);
+ }
+ default:
+ break;
+ }
+ return partitionRollbackRequests.iterator();
+ }).filter(Objects::nonNull).collect();
+ }
+
@Override
public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats)
throws HoodieIOException {
@@ -450,19 +417,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
}
}
- private Map<HeaderMetadataType, String> generateHeader(String commit) {
- // generate metadata
- Map<HeaderMetadataType, String> header = Maps.newHashMap();
- header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
- header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
- header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
- String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
- return header;
- }
+ private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
+ HoodieCommitMetadata commitMetadata) {
+ Preconditions.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
- private HoodieRollbackStat rollback(HoodieIndex hoodieIndex, String partitionPath, String commit,
- HoodieCommitMetadata commitMetadata, final Map<FileStatus, Boolean> filesToDeletedStatus,
- Map<FileStatus, Long> filesToNumBlocksRollback, Set<String> deletedFiles) {
// wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
@@ -470,47 +428,27 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
// baseCommit always by listing the file slice
Map<String, String> fileIdToBaseCommitTimeForLogMap = this.getRTFileSystemView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
- commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> {
- // Filter out stats without prevCommit since they are all inserts
- return wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT && wStat.getPrevCommit() != null
- && !deletedFiles.contains(wStat.getFileId());
- }).forEach(wStat -> {
- Writer writer = null;
- String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
- if (null != baseCommitTime) {
- boolean success = false;
- try {
- writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(FSUtils.getPartitionPath(this.getMetaClient().getBasePath(), partitionPath))
- .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime).withFs(this.metaClient.getFs())
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
- // generate metadata
- Map<HeaderMetadataType, String> header = generateHeader(commit);
- // if update belongs to an existing log file
- writer = writer.appendBlock(new HoodieCommandBlock(header));
- success = true;
- } catch (IOException | InterruptedException io) {
- throw new HoodieRollbackException("Failed to rollback for commit " + commit, io);
- } finally {
- try {
- if (writer != null) {
- writer.close();
- }
- if (success) {
- // This step is intentionally done after writer is closed. Guarantees that
- // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
- // cloud-storage : HUDI-168
- filesToNumBlocksRollback.put(this.getMetaClient().getFs().getFileStatus(writer.getLogFile().getPath()),
- 1L);
- }
- } catch (IOException io) {
- throw new UncheckedIOException(io);
+ return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream()
+ .filter(wStat -> {
+
+ // Filter out stats without prevCommit since they are all inserts
+ boolean validForRollback = (wStat != null) && (wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT)
+ && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());
+
+ if (validForRollback) {
+ // For sanity, log instant time can never be less than base-commit on which we are rolling back
+ Preconditions.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
+ wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
}
- }
- }
- });
- return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath).withDeletedFileResults(filesToDeletedStatus)
- .withRollbackBlockAppendResults(filesToNumBlocksRollback).build();
- }
+ return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
+ // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option
+ // to delete and we should not step on it
+ wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER);
+ }).map(wStat -> {
+ String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
+ return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(),
+ baseCommitTime, rollbackInstant);
+ }).collect(Collectors.toList());
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java
new file mode 100644
index 0000000..8bdf371
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+/**
+ * Performs Rollback of Hoodie Tables
+ */
+public class RollbackExecutor implements Serializable {
+
+ private static Logger logger = LogManager.getLogger(RollbackExecutor.class);
+
+ private final HoodieTableMetaClient metaClient;
+ private final HoodieWriteConfig config;
+
+ public RollbackExecutor(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+ this.metaClient = metaClient;
+ this.config = config;
+ }
+
+ /**
+ * Performs all rollback actions that we have collected in parallel.
+ */
+ public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc,
+ HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) {
+
+ SerializablePathFilter filter = (path) -> {
+ if (path.toString().contains(".parquet")) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return instantToRollback.getTimestamp().equals(fileCommitTime);
+ } else if (path.toString().contains(".log")) {
+ // Since the baseCommitTime is the only commit for new log files, it's okay here
+ String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+ return instantToRollback.getTimestamp().equals(fileCommitTime);
+ }
+ return false;
+ };
+
+ int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+ return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> {
+ final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
+ switch (rollbackRequest.getRollbackAction()) {
+ case DELETE_DATA_FILES_ONLY: {
+ deleteCleanedFiles(metaClient, config, filesToDeletedStatus, instantToRollback.getTimestamp(),
+ rollbackRequest.getPartitionPath());
+ return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+ .withDeletedFileResults(filesToDeletedStatus).build());
+ }
+ case DELETE_DATA_AND_LOG_FILES: {
+ deleteCleanedFiles(metaClient, config, filesToDeletedStatus, rollbackRequest.getPartitionPath(), filter);
+ return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+ .withDeletedFileResults(filesToDeletedStatus).build());
+ }
+ case APPEND_ROLLBACK_BLOCK: {
+ Writer writer = null;
+ boolean success = false;
+ try {
+ writer = HoodieLogFormat.newWriterBuilder().onParentPath(
+ FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
+ .withFileId(rollbackRequest.getFileId().get())
+ .overBaseCommit(rollbackRequest.getLatestBaseInstant().get())
+ .withFs(metaClient.getFs())
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+
+ // generate metadata
+ Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
+ // if update belongs to an existing log file
+ writer = writer.appendBlock(new HoodieCommandBlock(header));
+ success = true;
+ } catch (IOException | InterruptedException io) {
+ throw new HoodieRollbackException(
+ "Failed to rollback for instant " + instantToRollback, io);
+ } finally {
+ try {
+ if (writer != null) {
+ writer.close();
+ }
+ } catch (IOException io) {
+ throw new UncheckedIOException(io);
+ }
+ }
+
+ // This step is intentionally done after writer is closed. Guarantees that
+ // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
+ // cloud-storage : HUDI-168
+ Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
+ filesToNumBlocksRollback.put(metaClient.getFs()
+ .getFileStatus(writer.getLogFile().getPath()), 1L);
+ return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+ .withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
+ }
+ default:
+ throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
+ }
+ }).reduceByKey(this::mergeRollbackStat).map(Tuple2::_2).collect();
+ }
+
+ /**
+ * Helper to merge 2 rollback-stats for a given partition
+ *
+ * @param stat1 HoodieRollbackStat
+ * @param stat2 HoodieRollbackStat
+ * @return Merged HoodieRollbackStat
+ */
+ private HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRollbackStat stat2) {
+ Preconditions.checkArgument(stat1.getPartitionPath().equals(stat2.getPartitionPath()));
+ final List<String> successDeleteFiles = new ArrayList<>();
+ final List<String> failedDeleteFiles = new ArrayList<>();
+ final Map<FileStatus, Long> commandBlocksCount = new HashMap<>();
+
+ if (stat1.getSuccessDeleteFiles() != null) {
+ successDeleteFiles.addAll(stat1.getSuccessDeleteFiles());
+ }
+ if (stat2.getSuccessDeleteFiles() != null) {
+ successDeleteFiles.addAll(stat2.getSuccessDeleteFiles());
+ }
+ if (stat1.getFailedDeleteFiles() != null) {
+ failedDeleteFiles.addAll(stat1.getFailedDeleteFiles());
+ }
+ if (stat2.getFailedDeleteFiles() != null) {
+ failedDeleteFiles.addAll(stat2.getFailedDeleteFiles());
+ }
+ if (stat1.getCommandBlocksCount() != null) {
+ commandBlocksCount.putAll(stat1.getCommandBlocksCount());
+ }
+ if (stat2.getCommandBlocksCount() != null) {
+ commandBlocksCount.putAll(stat2.getCommandBlocksCount());
+ }
+ return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount);
+ }
+
+ /**
+ * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
+ */
+ private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+ Map<FileStatus, Boolean> results, String partitionPath,
+ PathFilter filter) throws IOException {
+ logger.info("Cleaning path " + partitionPath);
+ FileSystem fs = metaClient.getFs();
+ FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+ for (FileStatus file : toBeDeleted) {
+ boolean success = fs.delete(file.getPath(), false);
+ results.put(file, success);
+ logger.info("Delete file " + file.getPath() + "\t" + success);
+ }
+ return results;
+ }
+
+ /**
+ * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
+ */
+ private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+ Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
+ logger.info("Cleaning path " + partitionPath);
+ FileSystem fs = metaClient.getFs();
+ PathFilter filter = (path) -> {
+ if (path.toString().contains(".parquet")) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+ for (FileStatus file : toBeDeleted) {
+ boolean success = fs.delete(file.getPath(), false);
+ results.put(file, success);
+ logger.info("Delete file " + file.getPath() + "\t" + success);
+ }
+ return results;
+ }
+
+
+ private Map<HeaderMetadataType, String> generateHeader(String commit) {
+ // generate metadata
+ Map<HeaderMetadataType, String> header = Maps.newHashMap();
+ header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
+ header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
+ header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
+ String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+ return header;
+ }
+
+ public interface SerializablePathFilter extends PathFilter, Serializable {
+
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java
new file mode 100644
index 0000000..0e61988
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+/**
+ * Request for performing one rollback action
+ */
+public class RollbackRequest {
+
+ /**
+ * Rollback Action Types
+ */
+ public enum RollbackAction {
+ DELETE_DATA_FILES_ONLY,
+ DELETE_DATA_AND_LOG_FILES,
+ APPEND_ROLLBACK_BLOCK
+ }
+
+ /**
+ * Partition path that needs to be rolled-back
+ */
+ private final String partitionPath;
+
+ /**
+ * Rollback Instant
+ */
+ private final HoodieInstant rollbackInstant;
+
+ /**
+ * FileId in case of appending rollback block
+ */
+ private final Option<String> fileId;
+
+ /**
+ * Latest base instant needed for appending rollback block instant
+ */
+ private final Option<String> latestBaseInstant;
+
+ /**
+ * Rollback Action
+ */
+ private final RollbackAction rollbackAction;
+
+ public RollbackRequest(String partitionPath, HoodieInstant rollbackInstant,
+ Option<String> fileId, Option<String> latestBaseInstant, RollbackAction rollbackAction) {
+ this.partitionPath = partitionPath;
+ this.rollbackInstant = rollbackInstant;
+ this.fileId = fileId;
+ this.latestBaseInstant = latestBaseInstant;
+ this.rollbackAction = rollbackAction;
+ }
+
+ public static RollbackRequest createRollbackRequestWithDeleteDataFilesOnlyAction(String partitionPath,
+ HoodieInstant rollbackInstant) {
+ return new RollbackRequest(partitionPath, rollbackInstant, Option.empty(), Option.empty(),
+ RollbackAction.DELETE_DATA_FILES_ONLY);
+ }
+
+ public static RollbackRequest createRollbackRequestWithDeleteDataAndLogFilesAction(String partitionPath,
+ HoodieInstant rollbackInstant) {
+ return new RollbackRequest(partitionPath, rollbackInstant, Option.empty(), Option.empty(),
+ RollbackAction.DELETE_DATA_AND_LOG_FILES);
+ }
+
+ public static RollbackRequest createRollbackRequestWithAppendRollbackBlockAction(String partitionPath, String fileId,
+ String baseInstant, HoodieInstant rollbackInstant) {
+ return new RollbackRequest(partitionPath, rollbackInstant, Option.of(fileId), Option.of(baseInstant),
+ RollbackAction.APPEND_ROLLBACK_BLOCK);
+ }
+
+ public String getPartitionPath() {
+ return partitionPath;
+ }
+
+ public HoodieInstant getRollbackInstant() {
+ return rollbackInstant;
+ }
+
+ public Option<String> getFileId() {
+ return fileId;
+ }
+
+ public Option<String> getLatestBaseInstant() {
+ return latestBaseInstant;
+ }
+
+ public RollbackAction getRollbackAction() {
+ return rollbackAction;
+ }
+}