You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/09/15 22:53:00 UTC
[hudi] branch master updated: [HUDI-2433] Refactor rollback actions
in hudi-client module (#3664)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 916f12b [HUDI-2433] Refactor rollback actions in hudi-client module (#3664)
916f12b is described below
commit 916f12b7dd4b635c790b17876fa49fb786233e94
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Wed Sep 15 15:52:43 2021 -0700
[HUDI-2433] Refactor rollback actions in hudi-client module (#3664)
---
...java => CopyOnWriteRollbackActionExecutor.java} | 48 ++--
.../rollback/ListingBasedRollbackHelper.java | 249 ++++++++++++++++++++
...ategy.java => MarkerBasedRollbackStrategy.java} | 68 +++++-
...java => MergeOnReadRollbackActionExecutor.java} | 68 ++++--
.../client/common/HoodieFlinkEngineContext.java | 12 +
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 4 +-
.../hudi/table/HoodieFlinkMergeOnReadTable.java | 4 +-
.../FlinkCopyOnWriteRollbackActionExecutor.java | 71 ------
.../rollback/FlinkMarkerBasedRollbackStrategy.java | 90 --------
.../FlinkMergeOnReadRollbackActionExecutor.java | 77 -------
.../rollback/ListingBasedRollbackHelper.java | 250 --------------------
.../table/upgrade/ZeroToOneUpgradeHandler.java | 2 +-
.../client/common/HoodieJavaEngineContext.java | 10 +
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 4 +-
.../JavaCopyOnWriteRestoreActionExecutor.java | 4 +-
.../JavaCopyOnWriteRollbackActionExecutor.java | 72 ------
.../rollback/JavaListingBasedRollbackHelper.java | 237 -------------------
.../rollback/JavaMarkerBasedRollbackStrategy.java | 78 -------
.../client/common/HoodieSparkEngineContext.java | 9 +
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 4 +-
.../hudi/table/HoodieSparkMergeOnReadTable.java | 5 +-
.../SparkCopyOnWriteRestoreActionExecutor.java | 4 +-
.../SparkMergeOnReadRestoreActionExecutor.java | 4 +-
.../rollback/ListingBasedRollbackHelper.java | 252 ---------------------
.../SparkCopyOnWriteRollbackActionExecutor.java | 73 ------
.../rollback/SparkMarkerBasedRollbackStrategy.java | 93 --------
.../SparkMergeOnReadRollbackActionExecutor.java | 82 -------
.../TestCopyOnWriteRollbackActionExecutor.java | 10 +-
.../TestMergeOnReadRollbackActionExecutor.java | 24 +-
.../TestMarkerBasedRollbackStrategy.java | 6 +-
.../hudi/common/engine/HoodieEngineContext.java | 4 +
.../common/engine/HoodieLocalEngineContext.java | 11 +
.../hudi/common/function/FunctionWrapper.java | 11 +
.../common/function/SerializableBiFunction.java | 34 +++
34 files changed, 512 insertions(+), 1462 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
similarity index 60%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseCopyOnWriteRollbackActionExecutor.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
index fa74f7f..44b5492 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseCopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
@@ -33,31 +33,40 @@ import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
-public abstract class BaseCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
+public class CopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
- private static final Logger LOG = LogManager.getLogger(BaseCopyOnWriteRollbackActionExecutor.class);
+ private static final Logger LOG = LogManager.getLogger(CopyOnWriteRollbackActionExecutor.class);
- public BaseCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, I, K, O> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants) {
+ public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, I, K, O> table,
+ String instantTime,
+ HoodieInstant commitInstant,
+ boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
}
- public BaseCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, I, K, O> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants,
- boolean skipTimelinePublish,
- boolean useMarkerBasedStrategy) {
+ public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, I, K, O> table,
+ String instantTime,
+ HoodieInstant commitInstant,
+ boolean deleteInstants,
+ boolean skipTimelinePublish,
+ boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
}
@Override
+ protected RollbackStrategy getRollbackStrategy() {
+ if (useMarkerBasedStrategy) {
+ return new MarkerBasedRollbackStrategy(table, context, config, instantTime);
+ } else {
+ return this::executeRollbackUsingFileListing;
+ }
+ }
+
+ @Override
protected List<HoodieRollbackStat> executeRollback() {
HoodieTimer rollbackTimer = new HoodieTimer();
rollbackTimer.startTimer();
@@ -88,4 +97,11 @@ public abstract class BaseCopyOnWriteRollbackActionExecutor<T extends HoodieReco
LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
return stats;
}
+
+ @Override
+ protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
+ List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
+ context, table.getMetaClient().getBasePath(), config);
+ return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
new file mode 100644
index 0000000..8490872
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Performs Rollback of Hoodie Tables.
+ */
+public class ListingBasedRollbackHelper implements Serializable {
+ private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
+
+ private final HoodieTableMetaClient metaClient;
+ private final HoodieWriteConfig config;
+
+ public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+ this.metaClient = metaClient;
+ this.config = config;
+ }
+
+ /**
+ * Performs all rollback actions that we have collected in parallel.
+ */
+ public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback,
+ List<ListingBasedRollbackRequest> rollbackRequests) {
+ int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+ context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
+ return context.mapToPairAndReduceByKey(rollbackRequests,
+ rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, true),
+ RollbackUtils::mergeRollbackStat,
+ parallelism);
+ }
+
+ /**
+ * Collect all file info that needs to be rollbacked.
+ */
+ public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback,
+ List<ListingBasedRollbackRequest> rollbackRequests) {
+ int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+ context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
+ return context.mapToPairAndReduceByKey(rollbackRequests,
+ rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, false),
+ RollbackUtils::mergeRollbackStat,
+ parallelism);
+ }
+
+ /**
+ * May be delete interested files and collect stats or collect stats only.
+ *
+ * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
+ * @param doDelete {@code true} if deletion has to be done.
+ * {@code false} if only stats are to be collected w/o performing any deletes.
+ * @return stats collected with or w/o actual deletions.
+ */
+ private Pair<String, HoodieRollbackStat> maybeDeleteAndCollectStats(ListingBasedRollbackRequest rollbackRequest,
+ HoodieInstant instantToRollback,
+ boolean doDelete) throws IOException {
+ switch (rollbackRequest.getType()) {
+ case DELETE_DATA_FILES_ONLY: {
+ final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
+ rollbackRequest.getPartitionPath(), doDelete);
+ return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+ .withDeletedFileResults(filesToDeletedStatus).build());
+ }
+ case DELETE_DATA_AND_LOG_FILES: {
+ final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
+ return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+ .withDeletedFileResults(filesToDeletedStatus).build());
+ }
+ case APPEND_ROLLBACK_BLOCK: {
+ String fileId = rollbackRequest.getFileId().get();
+ String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
+
+ // collect all log files that is supposed to be deleted with this rollback
+ Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
+ FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
+ fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
+ .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
+
+ HoodieLogFormat.Writer writer = null;
+ try {
+ writer = HoodieLogFormat.newWriterBuilder()
+ .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
+ .withFileId(fileId)
+ .overBaseCommit(latestBaseInstant)
+ .withFs(metaClient.getFs())
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+
+ // generate metadata
+ if (doDelete) {
+ Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
+ // if update belongs to an existing log file
+ writer.appendBlock(new HoodieCommandBlock(header));
+ }
+ } catch (IOException | InterruptedException io) {
+ throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
+ } finally {
+ try {
+ if (writer != null) {
+ writer.close();
+ }
+ } catch (IOException io) {
+ throw new HoodieIOException("Error appending rollback block..", io);
+ }
+ }
+
+ // This step is intentionally done after writer is closed. Guarantees that
+ // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
+ // cloud-storage : HUDI-168
+ Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
+ metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
+ 1L
+ );
+
+ return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+ .withRollbackBlockAppendResults(filesToNumBlocksRollback)
+ .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
+ }
+ default:
+ throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
+ }
+ }
+
+ /**
+ * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
+ */
+ private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+ String commit, String partitionPath, boolean doDelete) throws IOException {
+ LOG.info("Cleaning path " + partitionPath);
+ String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ SerializablePathFilter filter = (path) -> {
+ if (path.toString().endsWith(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ } else if (FSUtils.isLogFile(path)) {
+ // Since the baseCommitTime is the only commit for new log files, it's okay here
+ String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+
+ final Map<FileStatus, Boolean> results = new HashMap<>();
+ FileSystem fs = metaClient.getFs();
+ FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+ for (FileStatus file : toBeDeleted) {
+ if (doDelete) {
+ boolean success = fs.delete(file.getPath(), false);
+ results.put(file, success);
+ LOG.info("Delete file " + file.getPath() + "\t" + success);
+ } else {
+ results.put(file, true);
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
+ */
+ private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+ String commit, String partitionPath, boolean doDelete) throws IOException {
+ final Map<FileStatus, Boolean> results = new HashMap<>();
+ LOG.info("Cleaning path " + partitionPath);
+ FileSystem fs = metaClient.getFs();
+ String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ PathFilter filter = (path) -> {
+ if (path.toString().contains(basefileExtension)) {
+ String fileCommitTime = FSUtils.getCommitTime(path.getName());
+ return commit.equals(fileCommitTime);
+ }
+ return false;
+ };
+ FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+ for (FileStatus file : toBeDeleted) {
+ if (doDelete) {
+ boolean success = fs.delete(file.getPath(), false);
+ results.put(file, success);
+ LOG.info("Delete file " + file.getPath() + "\t" + success);
+ } else {
+ results.put(file, true);
+ }
+ }
+ return results;
+ }
+
+ private Map<HeaderMetadataType, String> generateHeader(String commit) {
+ // generate metadata
+ Map<HeaderMetadataType, String> header = new HashMap<>(3);
+ header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
+ header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
+ header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
+ String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+ return header;
+ }
+
+ public interface SerializablePathFilter extends PathFilter, Serializable {
+
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
similarity index 63%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index cc596ba..1bfd4b1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -20,34 +20,43 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
+import org.apache.hudi.table.marker.WriteMarkers;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
* Performs rollback using marker files generated during the write..
*/
-public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecordPayload, I, K, O> implements BaseRollbackActionExecutor.RollbackStrategy {
+public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.RollbackStrategy {
- private static final Logger LOG = LogManager.getLogger(AbstractMarkerBasedRollbackStrategy.class);
+ private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackStrategy.class);
- protected final HoodieTable<T, I, K, O> table;
+ protected final HoodieTable<?, ?, ?, ?> table;
protected final transient HoodieEngineContext context;
@@ -57,7 +66,7 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
protected final String instantTime;
- public AbstractMarkerBasedRollbackStrategy(HoodieTable<T, I, K, O> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
+ public MarkerBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
this.table = table;
this.context = context;
this.basePath = table.getMetaClient().getBasePath();
@@ -124,8 +133,8 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
// the information of files appended to is required for metadata sync
Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
- table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
- 1L);
+ table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
+ 1L);
return HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath)
@@ -135,13 +144,48 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
/**
* Returns written log file size map for the respective baseCommitTime to assist in metadata table syncing.
- * @param partitionPath partition path of interest
- * @param baseCommitTime base commit time of interest
- * @param fileId fileId of interest
+ *
+ * @param partitionPathStr partition path of interest
+ * @param baseCommitTime base commit time of interest
+ * @param fileId fileId of interest
* @return Map<FileStatus, File size>
* @throws IOException
*/
- protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPath, String baseCommitTime, String fileId) throws IOException {
- return Collections.EMPTY_MAP;
+ protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
+ // collect all log files that is supposed to be deleted with this rollback
+ return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
+ FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
+ .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
+ }
+
+ @Override
+ public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
+ try {
+ List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
+ table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
+ int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1);
+ context.setJobStatus(this.getClass().getSimpleName(), "Rolling back using marker files");
+ return context.mapToPairAndReduceByKey(markerPaths, markerFilePath -> {
+ String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
+ IOType type = IOType.valueOf(typeStr);
+ HoodieRollbackStat rollbackStat;
+ switch (type) {
+ case MERGE:
+ rollbackStat = undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
+ break;
+ case APPEND:
+ rollbackStat = undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
+ break;
+ case CREATE:
+ rollbackStat = undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
+ break;
+ default:
+ throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
+ }
+ return new ImmutablePair<>(rollbackStat.getPartitionPath(), rollbackStat);
+ }, RollbackUtils::mergeRollbackStat, parallelism);
+ } catch (Exception e) {
+ throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
+ }
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
similarity index 54%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
index 2e75144..87d2628 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
@@ -7,13 +7,14 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.hudi.table.action.rollback;
@@ -24,38 +25,50 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-public abstract class BaseMergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
+public class MergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
- private static final Logger LOG = LogManager.getLogger(BaseMergeOnReadRollbackActionExecutor.class);
+ private static final Logger LOG = LogManager.getLogger(MergeOnReadRollbackActionExecutor.class);
- public BaseMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, I, K, O> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants) {
+ public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, I, K, O> table,
+ String instantTime,
+ HoodieInstant commitInstant,
+ boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
}
- public BaseMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, I, K, O> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants,
- boolean skipTimelinePublish,
- boolean useMarkerBasedStrategy) {
+ public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, I, K, O> table,
+ String instantTime,
+ HoodieInstant commitInstant,
+ boolean deleteInstants,
+ boolean skipTimelinePublish,
+ boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
}
@Override
+ protected RollbackStrategy getRollbackStrategy() {
+ if (useMarkerBasedStrategy) {
+ return new MarkerBasedRollbackStrategy(table, context, config, instantTime);
+ } else {
+ return this::executeRollbackUsingFileListing;
+ }
+ }
+
+ @Override
protected List<HoodieRollbackStat> executeRollback() {
HoodieTimer rollbackTimer = new HoodieTimer();
rollbackTimer.startTimer();
@@ -93,4 +106,15 @@ public abstract class BaseMergeOnReadRollbackActionExecutor<T extends HoodieReco
LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
return allRollbackStats;
}
+
+ @Override
+ protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
+ List<ListingBasedRollbackRequest> rollbackRequests;
+ try {
+ rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context);
+ } catch (IOException e) {
+ throw new HoodieIOException("Error generating rollback requests by file listing.", e);
+ }
+ return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests);
+ }
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 2fc5af1..66b7e78 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
@@ -32,6 +33,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -43,6 +45,7 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWra
import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
+import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrapper;
/**
* A flink engine implementation of HoodieEngineContext.
@@ -75,6 +78,15 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
}
@Override
+ public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
+ .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+ .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ @Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList());
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 8a9b4bf..27571bc 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -53,7 +53,7 @@ import org.apache.hudi.table.action.commit.FlinkInsertPreppedCommitActionExecuto
import org.apache.hudi.table.action.commit.FlinkMergeHelper;
import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
-import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -305,7 +305,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
- return new FlinkCopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
+ return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
@Override
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
index bfe8b6f..4614270 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
@@ -37,7 +37,7 @@ import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExe
import org.apache.hudi.table.action.commit.delta.FlinkUpsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor;
-import org.apache.hudi.table.action.rollback.FlinkMergeOnReadRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import java.util.List;
import java.util.Map;
@@ -108,7 +108,7 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
- return new FlinkMergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
+ return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java
deleted file mode 100644
index 47039a3..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import java.util.List;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class FlinkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload> extends
- BaseCopyOnWriteRollbackActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
- public FlinkCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants) {
- super(context, config, table, instantTime, commitInstant, deleteInstants);
- }
-
- public FlinkCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants,
- boolean skipTimelinePublish,
- boolean useMarkerBasedStrategy) {
- super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
- }
-
- @Override
- protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
- if (useMarkerBasedStrategy) {
- return new FlinkMarkerBasedRollbackStrategy(table, context, config, instantTime);
- } else {
- return this::executeRollbackUsingFileListing;
- }
- }
-
- @Override
- protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
- List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
- context, table.getMetaClient().getBasePath(), config);
- return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
- }
-}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java
deleted file mode 100644
index bb7ec76..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
-import org.apache.hudi.table.marker.WriteMarkers;
-
-import org.apache.hadoop.fs.FileStatus;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import scala.Tuple2;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class FlinkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
- public FlinkMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
- super(table, context, config, instantTime);
- }
-
- @Override
- public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
- try {
- List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
- table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
- List<HoodieRollbackStat> rollbackStats = context.map(markerPaths, markerFilePath -> {
- String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
- IOType type = IOType.valueOf(typeStr);
- switch (type) {
- case MERGE:
- return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
- case APPEND:
- return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
- case CREATE:
- return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
- default:
- throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
- }
- }, 0);
-
- return rollbackStats.stream().map(rollbackStat -> new Tuple2<>(rollbackStat.getPartitionPath(), rollbackStat))
- .collect(Collectors.groupingBy(Tuple2::_1))
- .values()
- .stream()
- .map(x -> x.stream().map(y -> y._2).reduce(RollbackUtils::mergeRollbackStat).get())
- .collect(Collectors.toList());
- } catch (Exception e) {
- throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
- }
- }
-
- protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
- // collect all log files that is supposed to be deleted with this rollback
- return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
- FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
- .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
- }
-}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java
deleted file mode 100644
index 25b20a5..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.table.HoodieTable;
-
-import java.io.IOException;
-import java.util.List;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class FlinkMergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload> extends
- BaseMergeOnReadRollbackActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
- public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants) {
- super(context, config, table, instantTime, commitInstant, deleteInstants);
- }
-
- public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants,
- boolean skipTimelinePublish,
- boolean useMarkerBasedStrategy) {
- super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
- }
-
- @Override
- protected RollbackStrategy getRollbackStrategy() {
- if (useMarkerBasedStrategy) {
- return new FlinkMarkerBasedRollbackStrategy(table, context, config, instantTime);
- } else {
- return this::executeRollbackUsingFileListing;
- }
- }
-
- @Override
- protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
- List<ListingBasedRollbackRequest> rollbackRequests;
- try {
- rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context);
- } catch (IOException e) {
- throw new HoodieIOException("Error generating rollback requests by file listing.", e);
- }
- return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests);
- }
-}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
deleted file mode 100644
index f03b211..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.ImmutablePair;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieRollbackException;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * Performs Rollback of Hoodie Tables.
- */
-public class ListingBasedRollbackHelper implements Serializable {
-
- private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
-
- private final HoodieTableMetaClient metaClient;
- private final HoodieWriteConfig config;
-
- public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
- this.metaClient = metaClient;
- this.config = config;
- }
-
- /**
- * Performs all rollback actions that we have collected in parallel.
- */
- public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
- Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true);
-
- Map<String, List<Pair<String, HoodieRollbackStat>>> collect = partitionPathRollbackStatsPairs.entrySet()
- .stream()
- .map(x -> Pair.of(x.getKey(), x.getValue())).collect(Collectors.groupingBy(Pair::getLeft));
- return collect.values().stream()
- .map(pairs -> pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null))
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
- }
-
- /**
- * Collect all file info that needs to be rollbacked.
- */
- public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
- Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false);
- return new ArrayList<>(partitionPathRollbackStatsPairs.values());
- }
-
- /**
- * May be delete interested files and collect stats or collect stats only.
- *
- * @param context instance of {@link HoodieEngineContext} to use.
- * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
- * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on.
- * @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
- * @return stats collected with or w/o actual deletions.
- */
- Map<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context,
- HoodieInstant instantToRollback,
- List<ListingBasedRollbackRequest> rollbackRequests,
- boolean doDelete) {
- return context.mapToPair(rollbackRequests, rollbackRequest -> {
- switch (rollbackRequest.getType()) {
- case DELETE_DATA_FILES_ONLY: {
- final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
- rollbackRequest.getPartitionPath(), doDelete);
- return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
- .withDeletedFileResults(filesToDeletedStatus).build());
- }
- case DELETE_DATA_AND_LOG_FILES: {
- final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
- return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
- .withDeletedFileResults(filesToDeletedStatus).build());
- }
- case APPEND_ROLLBACK_BLOCK: {
- String fileId = rollbackRequest.getFileId().get();
- String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
-
- // collect all log files that is supposed to be deleted with this rollback
- Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
- FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
- fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
- .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
-
- HoodieLogFormat.Writer writer = null;
- try {
- writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
- .withFileId(fileId)
- .overBaseCommit(latestBaseInstant)
- .withFs(metaClient.getFs())
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
- // generate metadata
- if (doDelete) {
- Map<HoodieLogBlock.HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
- // if update belongs to an existing log file
- writer.appendBlock(new HoodieCommandBlock(header));
- }
- } catch (IOException | InterruptedException io) {
- throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
- } finally {
- try {
- if (writer != null) {
- writer.close();
- }
- } catch (IOException io) {
- throw new HoodieIOException("Error appending rollback block..", io);
- }
- }
-
- // This step is intentionally done after writer is closed. Guarantees that
- // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
- // cloud-storage : HUDI-168
- Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
- metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
- 1L
- );
- return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
- .withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
- }
- default:
- throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
- }
- }, 0);
- }
-
- /**
- * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
- */
- private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
- String commit, String partitionPath, boolean doDelete) throws IOException {
- LOG.info("Cleaning path " + partitionPath);
- String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
- SerializablePathFilter filter = (path) -> {
- if (path.toString().endsWith(basefileExtension)) {
- String fileCommitTime = FSUtils.getCommitTime(path.getName());
- return commit.equals(fileCommitTime);
- } else if (FSUtils.isLogFile(path)) {
- // Since the baseCommitTime is the only commit for new log files, it's okay here
- String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
- return commit.equals(fileCommitTime);
- }
- return false;
- };
-
- final Map<FileStatus, Boolean> results = new HashMap<>();
- FileSystem fs = metaClient.getFs();
- FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
- for (FileStatus file : toBeDeleted) {
- if (doDelete) {
- boolean success = fs.delete(file.getPath(), false);
- results.put(file, success);
- LOG.info("Delete file " + file.getPath() + "\t" + success);
- } else {
- results.put(file, true);
- }
- }
- return results;
- }
-
- /**
- * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
- */
- private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
- String commit, String partitionPath, boolean doDelete) throws IOException {
- final Map<FileStatus, Boolean> results = new HashMap<>();
- LOG.info("Cleaning path " + partitionPath);
- FileSystem fs = metaClient.getFs();
- String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
- PathFilter filter = (path) -> {
- if (path.toString().contains(basefileExtension)) {
- String fileCommitTime = FSUtils.getCommitTime(path.getName());
- return commit.equals(fileCommitTime);
- }
- return false;
- };
- FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
- for (FileStatus file : toBeDeleted) {
- if (doDelete) {
- boolean success = fs.delete(file.getPath(), false);
- results.put(file, success);
- LOG.info("Delete file " + file.getPath() + "\t" + success);
- } else {
- results.put(file, true);
- }
- }
- return results;
- }
-
- private Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String commit) {
- // generate metadata
- Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(3);
- header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
- header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
- header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
- String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
- return header;
- }
-
- public interface SerializablePathFilter extends PathFilter, Serializable {
-
- }
-}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 59e94e5..cb024c6 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -27,8 +27,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
+import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
import java.util.List;
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index 013e094..f7a28e2 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
@@ -40,6 +41,7 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWra
import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
+import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrapper;
/**
* A java engine implementation of HoodieEngineContext.
@@ -60,6 +62,14 @@ public class HoodieJavaEngineContext extends HoodieEngineContext {
}
@Override
+ public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
+ .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+ .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get())
+ .collect(Collectors.toList());
+ }
+
+ @Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 06e66a1..7715bf9 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -50,7 +50,7 @@ import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor
import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor;
-import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import java.util.List;
@@ -193,7 +193,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
String rollbackInstantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
- return new JavaCopyOnWriteRollbackActionExecutor(
+ return new CopyOnWriteRollbackActionExecutor(
context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
index 75c1e0e..f7677ae 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import java.util.List;
@@ -48,7 +48,7 @@ public class JavaCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload>
@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
table.getMetaClient().reloadActiveTimeline();
- JavaCopyOnWriteRollbackActionExecutor rollbackActionExecutor = new JavaCopyOnWriteRollbackActionExecutor(
+ CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
context,
config,
table,
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java
deleted file mode 100644
index 15e3932..0000000
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import java.util.List;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class JavaCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload> extends
- BaseCopyOnWriteRollbackActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
- public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants) {
- super(context, config, table, instantTime, commitInstant, deleteInstants);
- }
-
- public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants,
- boolean skipTimelinePublish,
- boolean useMarkerBasedStrategy) {
- super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
- }
-
- @Override
- protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
- if (useMarkerBasedStrategy) {
- return new JavaMarkerBasedRollbackStrategy(table, context, config, instantTime);
- } else {
- return this::executeRollbackUsingFileListing;
- }
- }
-
- @Override
- protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
- List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
- context, table.getMetaClient().getBasePath(), config);
- return new JavaListingBasedRollbackHelper(table.getMetaClient(), config)
- .performRollback(context, instantToRollback, rollbackRequests);
- }
-}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java
deleted file mode 100644
index 5331ca5..0000000
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.ImmutablePair;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieRollbackException;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * Performs Rollback of Hoodie Tables.
- */
-public class JavaListingBasedRollbackHelper implements Serializable {
-
- private static final Logger LOG = LogManager.getLogger(JavaListingBasedRollbackHelper.class);
-
- private final HoodieTableMetaClient metaClient;
- private final HoodieWriteConfig config;
-
- public JavaListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
- this.metaClient = metaClient;
- this.config = config;
- }
-
- /**
- * Performs all rollback actions that we have collected in parallel.
- */
- public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
- Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true);
-
- Map<String, List<Pair<String, HoodieRollbackStat>>> collect = partitionPathRollbackStatsPairs.entrySet()
- .stream()
- .map(x -> Pair.of(x.getKey(), x.getValue())).collect(Collectors.groupingBy(Pair::getLeft));
- return collect.values().stream()
- .map(pairs -> pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null))
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
- }
-
- /**
- * Collect all file info that needs to be rollbacked.
- */
- public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
- Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false);
- return new ArrayList<>(partitionPathRollbackStatsPairs.values());
- }
-
- /**
- * May be delete interested files and collect stats or collect stats only.
- *
- * @param context instance of {@link HoodieEngineContext} to use.
- * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
- * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on.
- * @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
- * @return stats collected with or w/o actual deletions.
- */
- Map<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context,
- HoodieInstant instantToRollback,
- List<ListingBasedRollbackRequest> rollbackRequests,
- boolean doDelete) {
- return context.mapToPair(rollbackRequests, rollbackRequest -> {
- switch (rollbackRequest.getType()) {
- case DELETE_DATA_FILES_ONLY: {
- final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
- rollbackRequest.getPartitionPath(), doDelete);
- return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
- .withDeletedFileResults(filesToDeletedStatus).build());
- }
- case DELETE_DATA_AND_LOG_FILES: {
- final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
- return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
- .withDeletedFileResults(filesToDeletedStatus).build());
- }
- case APPEND_ROLLBACK_BLOCK: {
- HoodieLogFormat.Writer writer = null;
- try {
- writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
- .withFileId(rollbackRequest.getFileId().get())
- .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
- // generate metadata
- if (doDelete) {
- Map<HoodieLogBlock.HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
- // if update belongs to an existing log file
- writer.appendBlock(new HoodieCommandBlock(header));
- }
- } catch (IOException | InterruptedException io) {
- throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
- } finally {
- try {
- if (writer != null) {
- writer.close();
- }
- } catch (IOException io) {
- throw new HoodieIOException("Error appending rollback block..", io);
- }
- }
-
- // This step is intentionally done after writer is closed. Guarantees that
- // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
- // cloud-storage : HUDI-168
- Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
- metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L
- );
- return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
- .withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
- }
- default:
- throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
- }
- }, 0);
- }
-
- /**
- * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
- */
- private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
- String commit, String partitionPath, boolean doDelete) throws IOException {
- LOG.info("Cleaning path " + partitionPath);
- String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
- SerializablePathFilter filter = (path) -> {
- if (path.toString().endsWith(basefileExtension)) {
- String fileCommitTime = FSUtils.getCommitTime(path.getName());
- return commit.equals(fileCommitTime);
- } else if (FSUtils.isLogFile(path)) {
- // Since the baseCommitTime is the only commit for new log files, it's okay here
- String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
- return commit.equals(fileCommitTime);
- }
- return false;
- };
-
- final Map<FileStatus, Boolean> results = new HashMap<>();
- FileSystem fs = metaClient.getFs();
- FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
- for (FileStatus file : toBeDeleted) {
- if (doDelete) {
- boolean success = fs.delete(file.getPath(), false);
- results.put(file, success);
- LOG.info("Delete file " + file.getPath() + "\t" + success);
- } else {
- results.put(file, true);
- }
- }
- return results;
- }
-
- /**
- * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
- */
- private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
- String commit, String partitionPath, boolean doDelete) throws IOException {
- final Map<FileStatus, Boolean> results = new HashMap<>();
- LOG.info("Cleaning path " + partitionPath);
- FileSystem fs = metaClient.getFs();
- String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
- PathFilter filter = (path) -> {
- if (path.toString().contains(basefileExtension)) {
- String fileCommitTime = FSUtils.getCommitTime(path.getName());
- return commit.equals(fileCommitTime);
- }
- return false;
- };
- FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
- for (FileStatus file : toBeDeleted) {
- if (doDelete) {
- boolean success = fs.delete(file.getPath(), false);
- results.put(file, success);
- LOG.info("Delete file " + file.getPath() + "\t" + success);
- } else {
- results.put(file, true);
- }
- }
- return results;
- }
-
- private Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String commit) {
- // generate metadata
- Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(3);
- header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
- header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
- header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
- String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
- return header;
- }
-
- public interface SerializablePathFilter extends PathFilter, Serializable {
-
- }
-}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java
deleted file mode 100644
index 150f663..0000000
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
-import org.apache.hudi.table.marker.WriteMarkers;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class JavaMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
- public JavaMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
- HoodieEngineContext context,
- HoodieWriteConfig config,
- String instantTime) {
- super(table, context, config, instantTime);
- }
-
- @Override
- public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
- try {
- List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
- table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
- List<HoodieRollbackStat> rollbackStats = context.map(markerPaths, markerFilePath -> {
- String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
- IOType type = IOType.valueOf(typeStr);
- switch (type) {
- case MERGE:
- return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
- case APPEND:
- return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
- case CREATE:
- return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
- default:
- throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
- }
- }, 0);
-
- return rollbackStats.stream().map(rollbackStat -> Pair.of(rollbackStat.getPartitionPath(), rollbackStat))
- .collect(Collectors.groupingBy(Pair::getKey))
- .values()
- .stream()
- .map(x -> x.stream().map(y -> y.getValue()).reduce(RollbackUtils::mergeRollbackStat).get())
- .collect(Collectors.toList());
- } catch (Exception e) {
- throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
- }
- }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index d869ec7..ad1d7cd 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
@@ -74,6 +75,14 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
}
@Override
+ public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ return javaSparkContext.parallelize(data, parallelism).mapToPair(input -> {
+ Pair<K, V> pair = mapToPairFunc.call(input);
+ return new Tuple2<>(pair.getLeft(), pair.getRight());
+ }).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
+ }
+
+ @Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
return javaSparkContext.parallelize(data, parallelism).flatMap(x -> func.apply(x).iterator()).collect();
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 6a2bd6f..26d14cf 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -65,7 +65,7 @@ import org.apache.hudi.table.action.commit.SparkMergeHelper;
import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor;
-import org.apache.hudi.table.action.rollback.SparkCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -247,7 +247,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
- return new SparkCopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
+ return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 997116e..2db4eeb 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -49,7 +49,8 @@ import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExec
import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.restore.SparkMergeOnReadRestoreActionExecutor;
-import org.apache.hudi.table.action.rollback.SparkMergeOnReadRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
+
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
@@ -146,7 +147,7 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
String rollbackInstantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
- return new SparkMergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
+ return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
index 101b321..9c6ec6e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.SparkCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.spark.api.java.JavaRDD;
@@ -49,7 +49,7 @@ public class SparkCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload
@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
table.getMetaClient().reloadActiveTimeline();
- SparkCopyOnWriteRollbackActionExecutor rollbackActionExecutor = new SparkCopyOnWriteRollbackActionExecutor(
+ CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
(HoodieSparkEngineContext) context,
config,
table,
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java
index c320579..ebca1fe 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java
@@ -29,7 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.SparkMergeOnReadRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import org.apache.spark.api.java.JavaRDD;
@@ -48,7 +48,7 @@ public class SparkMergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload
@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
table.getMetaClient().reloadActiveTimeline();
- SparkMergeOnReadRollbackActionExecutor rollbackActionExecutor = new SparkMergeOnReadRollbackActionExecutor(
+ MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
context,
config,
table,
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
deleted file mode 100644
index fcb3882..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieRollbackException;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-import scala.Tuple2;
-
-/**
- * Performs Rollback of Hoodie Tables.
- */
-public class ListingBasedRollbackHelper implements Serializable {
-
- private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
-
- private final HoodieTableMetaClient metaClient;
- private final HoodieWriteConfig config;
-
- public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
- this.metaClient = metaClient;
- this.config = config;
- }
-
- /**
- * Performs all rollback actions that we have collected in parallel.
- */
- public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
- int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
- context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
- JavaPairRDD<String, HoodieRollbackStat> partitionPathRollbackStatsPairRDD = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, sparkPartitions, true);
- return partitionPathRollbackStatsPairRDD.reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect();
- }
-
- /**
- * Collect all file info that needs to be rollbacked.
- */
- public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
- int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
- context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
- JavaPairRDD<String, HoodieRollbackStat> partitionPathRollbackStatsPairRDD = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, sparkPartitions, false);
- return partitionPathRollbackStatsPairRDD.map(Tuple2::_2).collect();
- }
-
- /**
- * May be delete interested files and collect stats or collect stats only.
- *
- * @param context instance of {@link HoodieEngineContext} to use.
- * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
- * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on.
- * @param sparkPartitions number of spark partitions to use for parallelism.
- * @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
- * @return stats collected with or w/o actual deletions.
- */
- JavaPairRDD<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests,
- int sparkPartitions, boolean doDelete) {
- JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
- return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> {
- switch (rollbackRequest.getType()) {
- case DELETE_DATA_FILES_ONLY: {
- final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
- rollbackRequest.getPartitionPath(), doDelete);
- return new Tuple2<>(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
- .withDeletedFileResults(filesToDeletedStatus).build());
- }
- case DELETE_DATA_AND_LOG_FILES: {
- final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
- return new Tuple2<>(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
- .withDeletedFileResults(filesToDeletedStatus).build());
- }
- case APPEND_ROLLBACK_BLOCK: {
- String fileId = rollbackRequest.getFileId().get();
- String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
-
- // collect all log files that is supposed to be deleted with this rollback
- Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
- FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
- fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
- .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
-
- Writer writer = null;
- try {
- writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
- .withFileId(fileId)
- .overBaseCommit(latestBaseInstant)
- .withFs(metaClient.getFs())
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
- // generate metadata
- if (doDelete) {
- Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
- // if update belongs to an existing log file
- writer.appendBlock(new HoodieCommandBlock(header));
- }
- } catch (IOException | InterruptedException io) {
- throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
- } finally {
- try {
- if (writer != null) {
- writer.close();
- }
- } catch (IOException io) {
- throw new HoodieIOException("Error appending rollback block..", io);
- }
- }
-
- // This step is intentionally done after writer is closed. Guarantees that
- // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
- // cloud-storage : HUDI-168
- Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
- metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
- 1L
- );
-
- return new Tuple2<>(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
- .withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
- }
- default:
- throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
- }
- });
- }
-
- /**
- * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
- */
- private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
- String commit, String partitionPath, boolean doDelete) throws IOException {
- LOG.info("Cleaning path " + partitionPath);
- String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
- SerializablePathFilter filter = (path) -> {
- if (path.toString().endsWith(basefileExtension)) {
- String fileCommitTime = FSUtils.getCommitTime(path.getName());
- return commit.equals(fileCommitTime);
- } else if (FSUtils.isLogFile(path)) {
- // Since the baseCommitTime is the only commit for new log files, it's okay here
- String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
- return commit.equals(fileCommitTime);
- }
- return false;
- };
-
- final Map<FileStatus, Boolean> results = new HashMap<>();
- FileSystem fs = metaClient.getFs();
- FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
- for (FileStatus file : toBeDeleted) {
- if (doDelete) {
- boolean success = fs.delete(file.getPath(), false);
- results.put(file, success);
- LOG.info("Delete file " + file.getPath() + "\t" + success);
- } else {
- results.put(file, true);
- }
- }
- return results;
- }
-
- /**
- * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
- */
- private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
- String commit, String partitionPath, boolean doDelete) throws IOException {
- final Map<FileStatus, Boolean> results = new HashMap<>();
- LOG.info("Cleaning path " + partitionPath);
- FileSystem fs = metaClient.getFs();
- String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
- PathFilter filter = (path) -> {
- if (path.toString().contains(basefileExtension)) {
- String fileCommitTime = FSUtils.getCommitTime(path.getName());
- return commit.equals(fileCommitTime);
- }
- return false;
- };
- FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
- for (FileStatus file : toBeDeleted) {
- if (doDelete) {
- boolean success = fs.delete(file.getPath(), false);
- results.put(file, success);
- LOG.info("Delete file " + file.getPath() + "\t" + success);
- } else {
- results.put(file, true);
- }
- }
- return results;
- }
-
- private Map<HeaderMetadataType, String> generateHeader(String commit) {
- // generate metadata
- Map<HeaderMetadataType, String> header = new HashMap<>(3);
- header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
- header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
- header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
- String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
- return header;
- }
-
- public interface SerializablePathFilter extends PathFilter, Serializable {
-
- }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java
deleted file mode 100644
index 611ec21..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.spark.api.java.JavaRDD;
-
-import java.util.List;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload> extends
- BaseCopyOnWriteRollbackActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
- public SparkCopyOnWriteRollbackActionExecutor(HoodieSparkEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants) {
- super(context, config, table, instantTime, commitInstant, deleteInstants);
- }
-
- public SparkCopyOnWriteRollbackActionExecutor(HoodieSparkEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants,
- boolean skipTimelinePublish,
- boolean useMarkerBasedStrategy) {
- super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
- }
-
- @Override
- protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
- if (useMarkerBasedStrategy) {
- return new SparkMarkerBasedRollbackStrategy(table, context, config, instantTime);
- } else {
- return this::executeRollbackUsingFileListing;
- }
- }
-
- @Override
- protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
- List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath(), config);
- return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
- }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
deleted file mode 100644
index 0adacd2..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
-import org.apache.hudi.table.marker.WriteMarkers;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import scala.Tuple2;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
- public SparkMarkerBasedRollbackStrategy(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
- super(table, context, config, instantTime);
- }
-
- @Override
- public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
- JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
- try {
- List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
- table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
- int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1);
- jsc.setJobGroup(this.getClass().getSimpleName(), "Rolling back using marker files");
- return jsc.parallelize(markerPaths, parallelism)
- .map(markerFilePath -> {
- String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
- IOType type = IOType.valueOf(typeStr);
- switch (type) {
- case MERGE:
- return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
- case APPEND:
- return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
- case CREATE:
- return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
- default:
- throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
- }
- })
- .mapToPair(rollbackStat -> new Tuple2<>(rollbackStat.getPartitionPath(), rollbackStat))
- .reduceByKey(RollbackUtils::mergeRollbackStat)
- .map(Tuple2::_2).collect();
- } catch (Exception e) {
- throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
- }
- }
-
- protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
- // collect all log files that is supposed to be deleted with this rollback
- return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
- FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
- .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
- }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMergeOnReadRollbackActionExecutor.java
deleted file mode 100644
index 9486362..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMergeOnReadRollbackActionExecutor.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import java.io.IOException;
-import java.util.List;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkMergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload> extends
- BaseMergeOnReadRollbackActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
- public SparkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants) {
- super(context, config, table, instantTime, commitInstant, deleteInstants);
- }
-
- public SparkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
- String instantTime,
- HoodieInstant commitInstant,
- boolean deleteInstants,
- boolean skipTimelinePublish,
- boolean useMarkerBasedStrategy) {
- super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
- }
-
- @Override
- protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
- if (useMarkerBasedStrategy) {
- return new SparkMarkerBasedRollbackStrategy(table, context, config, instantTime);
- } else {
- return this::executeRollbackUsingFileListing;
- }
- }
-
- @Override
- protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
- List<ListingBasedRollbackRequest> rollbackRequests;
- JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
- try {
- rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context);
- } catch (IOException e) {
- throw new HoodieIOException("Error generating rollback requests by file listing.", e);
- }
- return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests);
- }
-}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index bc1f3c3..810733c 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -83,8 +83,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002");
// execute CopyOnWriteRollbackActionExecutor with filelisting mode
- SparkCopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new SparkCopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true);
- assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
+ CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true);
+ assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
List<HoodieRollbackStat> hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback();
// assert hoodieRollbackStats
@@ -162,11 +162,11 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
commitInstant = table.getCompletedCommitTimeline().lastInstant().get();
}
- SparkCopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new SparkCopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false);
+ CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false);
if (!isUsingMarkers) {
- assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
+ assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
} else {
- assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
+ assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
}
Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index 75e6a7a..5d269cf 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -89,7 +89,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
//2. rollback
HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
- SparkMergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new SparkMergeOnReadRollbackActionExecutor(
+ MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
context,
cfg,
table,
@@ -98,9 +98,9 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
true);
// assert is filelist mode
if (!isUsingMarkers) {
- assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
+ assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
} else {
- assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
+ assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
}
//3. assert the rollback stat
@@ -145,15 +145,15 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
public void testFailForCompletedInstants() {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
HoodieInstant rollBackInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
- new SparkMergeOnReadRollbackActionExecutor(
- context,
- getConfigBuilder().build(),
- getHoodieTable(metaClient, getConfigBuilder().build()),
- "003",
- rollBackInstant,
- true,
- true,
- true);
+ new MergeOnReadRollbackActionExecutor(
+ context,
+ getConfigBuilder().build(),
+ getHoodieTable(metaClient, getConfigBuilder().build()),
+ "003",
+ rollBackInstant,
+ true,
+ true,
+ true);
});
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
index 6e67386..94fa697 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
@@ -32,7 +32,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.action.rollback.SparkMarkerBasedRollbackStrategy;
+import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hadoop.fs.FileStatus;
@@ -93,7 +93,7 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
.withMarkerFile("partA", f2, IOType.CREATE);
// when
- List<HoodieRollbackStat> stats = new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002")
+ List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002")
.execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
// then: ensure files are deleted correctly, non-existent files reported as failed deletes
@@ -176,7 +176,7 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
writeStatuses.collect();
// rollback 2nd commit and ensure stats reflect the info.
- return new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003")
+ return new MarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003")
.execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"));
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 0128ce5..8ea6a43 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.engine;
import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
@@ -56,6 +57,9 @@ public abstract class HoodieEngineContext {
public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);
+ public abstract <I, K, V> List<V> mapToPairAndReduceByKey(
+ List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism);
+
public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);
public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index e804567..0aeb9d8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.engine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
@@ -37,6 +38,7 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWra
import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
+import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrapper;
/**
* A java based engine context, use this implementation on the query engine integrations if needed.
@@ -57,6 +59,15 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext {
}
@Override
+ public <I, K, V> List<V> mapToPairAndReduceByKey(
+ List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
+ .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+ .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get())
+ .collect(Collectors.toList());
+ }
+
+ @Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java b/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
index 405f57e..b729e48 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.function;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
+import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
@@ -70,4 +71,14 @@ public class FunctionWrapper {
}
};
}
+
+ public static <V> BinaryOperator<V> throwingReduceWrapper(SerializableBiFunction<V, V, V> throwingReduceFunction) {
+ return (v1, v2) -> {
+ try {
+ return throwingReduceFunction.apply(v1, v2);
+ } catch (Exception e) {
+ throw new HoodieException("Error occurs when executing mapToPair", e);
+ }
+ };
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/function/SerializableBiFunction.java b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializableBiFunction.java
new file mode 100644
index 0000000..940396c
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializableBiFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.function;
+
+import java.io.Serializable;
+
+/**
+ * A function that accepts two arguments and produces a result.
+ *
+ * @param <T> the type of the first argument to the function
+ * @param <U> the type of the second argument to the function
+ * @param <R> the type of the result of the function
+ */
+@FunctionalInterface
+public interface SerializableBiFunction<T, U, R> extends Serializable {
+ R apply(T t, U u);
+}