You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/26 01:26:51 UTC
[incubator-hudi] branch master updated: [HUDI-785] Refactor
compaction/savepoint execution based on ActionExector abstraction (#1548)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 19ca0b5 [HUDI-785] Refactor compaction/savepoint execution based on ActionExector abstraction (#1548)
19ca0b5 is described below
commit 19ca0b56296bf89c611406bb86c3626f87d562ee
Author: vinoth chandar <vi...@users.noreply.github.com>
AuthorDate: Sat Apr 25 18:26:44 2020 -0700
[HUDI-785] Refactor compaction/savepoint execution based on ActionExector abstraction (#1548)
- Savepoint and compaction classes moved to table.action.* packages
- HoodieWriteClient#savepoint(...) returns void
- Renamed HoodieCommitArchiveLog -> HoodieTimelineArchiveLog
- Fixed tests to take into account the additional validation done
- Moved helper code into CompactHelpers and SavepointHelpers
---
.../hudi/cli/commands/CompactionCommand.java | 2 +-
.../hudi/cli/commands/SavepointsCommand.java | 8 +-
.../org/apache/hudi/cli/commands/SparkMain.java | 7 +-
.../java/org/apache/hudi/cli/utils/CommitUtil.java | 2 +-
.../cli/commands/TestArchivedCommitsCommand.java | 4 +-
.../hudi/cli/common/HoodieTestCommitUtilities.java | 4 +-
.../hudi/client/AbstractHoodieWriteClient.java | 38 +-
.../apache/hudi/client/CompactionAdminClient.java | 12 +-
.../org/apache/hudi/client/HoodieWriteClient.java | 484 ++++-----------------
.../apache/hudi/config/HoodieCompactionConfig.java | 4 +-
.../org/apache/hudi/config/HoodieWriteConfig.java | 2 +-
.../apache/hudi/table/HoodieCopyOnWriteTable.java | 18 +-
.../apache/hudi/table/HoodieMergeOnReadTable.java | 55 +--
.../java/org/apache/hudi/table/HoodieTable.java | 95 ++--
...chiveLog.java => HoodieTimelineArchiveLog.java} | 8 +-
.../action/{commit => }/HoodieWriteMetadata.java | 2 +-
.../hudi/table/action/clean/CleanPlanner.java | 28 +-
.../action/commit/BaseCommitActionExecutor.java | 4 +-
.../commit/BulkInsertCommitActionExecutor.java | 1 +
.../hudi/table/action/commit/BulkInsertHelper.java | 1 +
.../BulkInsertPreppedCommitActionExecutor.java | 1 +
.../action/commit/DeleteCommitActionExecutor.java | 1 +
.../hudi/table/action/commit/DeleteHelper.java | 5 +-
.../action/commit/InsertCommitActionExecutor.java | 1 +
.../commit/InsertPreppedCommitActionExecutor.java | 1 +
.../action/commit/UpsertCommitActionExecutor.java | 1 +
.../commit/UpsertPreppedCommitActionExecutor.java | 1 +
.../hudi/table/action/commit/WriteHelper.java | 7 +-
.../hudi/table/action/compact/CompactHelpers.java | 70 +++
.../{ => action}/compact/HoodieCompactor.java | 2 +-
.../compact/HoodieMergeOnReadTableCompactor.java | 5 +-
.../{ => action}/compact/OperationResult.java | 2 +-
.../compact/RunCompactionActionExecutor.java | 93 ++++
.../compact/ScheduleCompactionActionExecutor.java | 121 ++++++
.../strategy/BoundedIOCompactionStrategy.java | 2 +-
.../BoundedPartitionAwareCompactionStrategy.java | 2 +-
.../compact/strategy/CompactionStrategy.java | 4 +-
.../strategy/DayBasedCompactionStrategy.java | 2 +-
.../LogFileSizeBasedCompactionStrategy.java | 2 +-
.../strategy/UnBoundedCompactionStrategy.java | 2 +-
.../UnBoundedPartitionAwareCompactionStrategy.java | 2 +-
.../BulkInsertDeltaCommitActionExecutor.java | 2 +-
...BulkInsertPreppedDeltaCommitActionExecutor.java | 2 +-
.../DeleteDeltaCommitActionExecutor.java | 2 +-
.../InsertDeltaCommitActionExecutor.java | 2 +-
.../InsertPreppedDeltaCommitActionExecutor.java | 2 +-
.../UpsertDeltaCommitActionExecutor.java | 2 +-
.../UpsertPreppedDeltaCommitActionExecutor.java | 2 +-
.../MergeOnReadRollbackActionExecutor.java | 2 +-
.../action/savepoint/SavepointActionExecutor.java | 115 +++++
.../table/action/savepoint/SavepointHelpers.java | 71 +++
.../org/apache/hudi/client/TestClientRollback.java | 2 +-
.../hudi/client/TestCompactionAdminClient.java | 2 +-
.../apache/hudi/io/TestHoodieCommitArchiveLog.java | 18 +-
.../{ => action}/compact/TestAsyncCompaction.java | 2 +-
.../{ => action}/compact/TestHoodieCompactor.java | 42 +-
.../strategy/TestHoodieCompactionStrategy.java | 2 +-
.../apache/hudi/common/model/HoodieTestUtils.java | 14 +
58 files changed, 789 insertions(+), 601 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index a4c70da..9b55fe2 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -42,7 +42,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.table.compact.OperationResult;
+import org.apache.hudi.table.action.compact.OperationResult;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hadoop.fs.FSDataInputStream;
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
index dac0021..b5bc349 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.spark.api.java.JavaSparkContext;
@@ -79,12 +80,13 @@ public class SavepointsCommand implements CommandMarker {
String result;
try (JavaSparkContext jsc = SparkUtil.initJavaSparkConf("Create Savepoint")) {
HoodieWriteClient client = createHoodieClient(jsc, metaClient.getBasePath());
- if (client.savepoint(commitTime, user, comments)) {
+ try {
+ client.savepoint(commitTime, user, comments);
// Refresh the current
refreshMetaClient();
result = String.format("The commit \"%s\" has been savepointed.", commitTime);
- } else {
- result = String.format("Failed: Could not savepoint commit \"%s\".", commitTime);
+ } catch (HoodieSavepointException se) {
+ result = String.format("Failed: Could not create savepoint \"%s\".", commitTime);
}
}
return result;
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 3534cc5..5aa2255 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -27,7 +27,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.table.compact.strategy.UnBoundedCompactionStrategy;
+import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.hudi.utilities.HDFSParquetImporter.Config;
import org.apache.hudi.utilities.HoodieCleaner;
@@ -283,10 +283,11 @@ public class SparkMain {
private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception {
HoodieWriteClient client = createHoodieClient(jsc, basePath);
- if (client.restoreToSavepoint(savepointTime)) {
+ try {
+ client.restoreToSavepoint(savepointTime);
LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
return 0;
- } else {
+ } catch (Exception e) {
LOG.info(String.format("The commit \"%s\" failed to roll back.", savepointTime));
return -1;
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
index 1cbe2e6..5a1c457 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
@@ -39,7 +39,7 @@ public class CommitUtil {
public static long countNewRecords(HoodieTableMetaClient target, List<String> commitsToCatchup) throws IOException {
long totalNew = 0;
- HoodieTimeline timeline = target.getActiveTimeline().reload().getCommitTimeline().filterCompletedInstants();
+ HoodieTimeline timeline = target.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
for (String commit : commitsToCatchup) {
HoodieCommitMetadata c = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get(),
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
index 56dc5e9..b2261ef 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieCommitArchiveLog;
+import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.junit.After;
import org.junit.Before;
@@ -91,7 +91,7 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest {
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
// archive
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
archiveLog.archiveIfRequired(jsc);
}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java b/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java
index bfd0f0f..75a02c8 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/common/HoodieTestCommitUtilities.java
@@ -20,7 +20,7 @@ package org.apache.hudi.cli.common;
import org.apache.hudi.avro.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.table.HoodieCommitArchiveLog;
+import org.apache.hudi.table.HoodieTimelineArchiveLog;
import java.util.LinkedHashMap;
import java.util.List;
@@ -36,7 +36,7 @@ public class HoodieTestCommitUtilities {
*/
public static org.apache.hudi.avro.model.HoodieCommitMetadata convertAndOrderCommitMetadata(
HoodieCommitMetadata hoodieCommitMetadata) {
- return orderCommitMetadata(HoodieCommitArchiveLog.convertCommitMetadata(hoodieCommitMetadata));
+ return orderCommitMetadata(HoodieTimelineArchiveLog.convertCommitMetadata(hoodieCommitMetadata));
}
/**
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index d712d57..8d730e2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client;
import com.codahale.metrics.Timer;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRollingStat;
@@ -57,7 +56,6 @@ import java.util.Map;
public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
private static final Logger LOG = LogManager.getLogger(AbstractHoodieWriteClient.class);
- private static final String UPDATE_STR = "update";
private final transient HoodieMetrics metrics;
private final transient HoodieIndex<T> index;
@@ -96,32 +94,6 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
return commit(instantTime, writeStatuses, extraMetadata, metaClient.getCommitActionType());
}
- protected JavaRDD<WriteStatus> updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieTable<T> table,
- String instantTime) {
- // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
- // RDD actions that are performed after updating the index.
- writeStatusRDD = writeStatusRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
- Timer.Context indexTimer = metrics.getIndexCtx();
- // Update the index back
- JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc, table);
- metrics.updateIndexMetrics(UPDATE_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
- // Trigger the insert and collect statuses
- commitOnAutoCommit(instantTime, statuses, table.getMetaClient().getCommitActionType());
- return statuses;
- }
-
- protected void commitOnAutoCommit(String instantTime, JavaRDD<WriteStatus> resultRDD, String actionType) {
- if (config.shouldAutoCommit()) {
- LOG.info("Auto commit enabled: Committing " + instantTime);
- boolean commitResult = commit(instantTime, resultRDD, Option.empty(), actionType);
- if (!commitResult) {
- throw new HoodieCommitException("Failed to commit " + instantTime);
- }
- } else {
- LOG.info("Auto commit disabled for " + instantTime);
- }
- }
-
private boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata, String actionType) {
@@ -131,7 +103,6 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
-
List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
updateMetadataAndRollingStats(actionType, metadata, stats);
@@ -149,10 +120,8 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
try {
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-
postCommit(metadata, instantTime, extraMetadata);
emitCommitMetrics(instantTime, metadata, actionType);
-
LOG.info("Committed " + instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
@@ -182,8 +151,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
*/
- protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime,
- Option<Map<String, String>> extraMetadata);
+ protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata);
/**
* Finalize Write operation.
@@ -260,7 +228,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
protected HoodieTable getTableAndInitCtx(WriteOperationType operationType) {
HoodieTableMetaClient metaClient = createMetaClient(true);
if (operationType == WriteOperationType.DELETE) {
- setWriteSchemaFromLastInstant(metaClient);
+ setWriteSchemaForDeletes(metaClient);
}
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
@@ -275,7 +243,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
/**
* Sets write schema from last instant since deletes may not have schema set in the config.
*/
- private void setWriteSchemaFromLastInstant(HoodieTableMetaClient metaClient) {
+ private void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
try {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
Option<HoodieInstant> lastInstant =
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
index 9f9ce12..627348c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
@@ -18,9 +18,10 @@
package org.apache.hudi.client;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
@@ -41,10 +42,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.table.compact.OperationResult;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.action.compact.OperationResult;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -71,10 +69,6 @@ public class CompactionAdminClient extends AbstractHoodieClient {
super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build());
}
- public CompactionAdminClient(JavaSparkContext jsc, String basePath, Option<EmbeddedTimelineService> timelineServer) {
- super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build(), timelineServer);
- }
-
/**
* Validate all compaction operations in a compaction plan. Verifies the file-slices are consistent with corresponding
* compaction operations.
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index c7de8df..5ae7ca0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -19,64 +19,48 @@
package org.apache.hudi.client;
import com.codahale.metrics.Timer;
-import org.apache.avro.Schema;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.client.utils.SparkConfigUtils;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
-import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
-import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
-import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
-import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.HoodieMetrics;
-import org.apache.hudi.table.HoodieCommitArchiveLog;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.compact.CompactHelpers;
+import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -171,9 +155,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
- validateSchema(table, true);
+ table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
- HoodieWriteMetadata result = table.upsert(jsc,instantTime, records);
+ HoodieWriteMetadata result = table.upsert(jsc, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
@@ -191,7 +175,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
- validateSchema(table, true);
+ table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT_PREPPED);
HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
@@ -209,7 +193,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
- validateSchema(table, false);
+ table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT);
HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
return postWrite(result, instantTime, table);
@@ -228,7 +212,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
- validateSchema(table, false);
+ table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_PREPPED);
HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
@@ -267,6 +251,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String instantTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
+ table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT);
HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
@@ -291,6 +276,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED);
+ table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
@@ -344,12 +330,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// Do an inline compaction if enabled
if (config.isInlineCompaction()) {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
- forceCompact(extraMetadata);
+ inlineCompact(extraMetadata);
} else {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
}
// We cannot have unbounded commit files. Archive commits if we have to archive
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, createMetaClient(true));
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true));
archiveLog.archiveIfRequired(jsc);
if (config.isAutoClean()) {
// Call clean to cleanup if there is anything to cleanup after the commit,
@@ -364,35 +350,25 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
/**
- * Savepoint a specific commit. Latest version of data files as of the passed in commitTime will be referenced in the
- * savepoint and will never be cleaned. The savepointed commit will never be rolledback or archived.
- * <p>
- * This gives an option to rollback the state to the savepoint anytime. Savepoint needs to be manually created and
- * deleted.
- * <p>
- * Savepoint should be on a commit that could not have been cleaned.
+ * Create a savepoint based on the latest commit action on the timeline.
*
* @param user - User creating the savepoint
* @param comment - Comment for the savepoint
- * @return true if the savepoint was created successfully
*/
- public boolean savepoint(String user, String comment) {
+ public void savepoint(String user, String comment) {
HoodieTable<T> table = HoodieTable.create(config, jsc);
if (table.getCompletedCommitsTimeline().empty()) {
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
}
- if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
- throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
- }
String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp();
LOG.info("Savepointing latest commit " + latestCommit);
- return savepoint(latestCommit, user, comment);
+ savepoint(latestCommit, user, comment);
}
/**
- * Savepoint a specific commit. Latest version of data files as of the passed in instantTime will be referenced in the
- * savepoint and will never be cleaned. The savepointed commit will never be rolledback or archived.
+ * Savepoint a specific commit instant time. Latest version of data files as of the passed in instantTime
+ * will be referenced in the savepoint and will never be cleaned. The savepointed commit will never be rolledback or archived.
* <p>
* This gives an option to rollback the state to the savepoint anytime. Savepoint needs to be manually created and
* deleted.
@@ -402,60 +378,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param instantTime - commit that should be savepointed
* @param user - User creating the savepoint
* @param comment - Comment for the savepoint
- * @return true if the savepoint was created successfully
*/
- public boolean savepoint(String instantTime, String user, String comment) {
+ public void savepoint(String instantTime, String user, String comment) {
HoodieTable<T> table = HoodieTable.create(config, jsc);
- if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
- throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
- }
- Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
-
- HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime);
- if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
- throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);
- }
-
- try {
- // Check the last commit that was not cleaned and check if savepoint time is > that commit
- String lastCommitRetained;
- if (cleanInstant.isPresent()) {
- HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
- .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
- lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
- } else {
- lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
- }
-
- // Cannot allow savepoint time on a commit that could have been cleaned
- ValidationUtils.checkArgument(
- HoodieTimeline.compareTimestamps(instantTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
- "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
-
- Map<String, List<String>> latestFilesMap = jsc
- .parallelize(FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(),
- config.shouldAssumeDatePartitioning()))
- .mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {
- // Scan all partitions files with this commit time
- LOG.info("Collecting latest files in partition path " + partitionPath);
- BaseFileOnlyView view = table.getBaseFileOnlyView();
- List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
- .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
- return new Tuple2<>(partitionPath, latestFiles);
- }).collectAsMap();
-
- HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
- // Nothing to save in the savepoint
- table.getActiveTimeline().createNewInstant(
- new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime));
- table.getActiveTimeline()
- .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime),
- TimelineMetadataUtils.serializeSavepointMetadata(metadata));
- LOG.info("Savepoint " + instantTime + " created");
- return true;
- } catch (IOException e) {
- throw new HoodieSavepointException("Failed to savepoint " + instantTime, e);
- }
+ table.savepoint(jsc, instantTime, user, comment);
}
/**
@@ -467,88 +393,24 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/
public void deleteSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.create(config, jsc);
- if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
- throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
- }
- HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-
- HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
- boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
- if (!isSavepointPresent) {
- LOG.warn("No savepoint present " + savepointTime);
- return;
- }
-
- activeTimeline.revertToInflight(savePoint);
- activeTimeline.deleteInflight(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime));
- LOG.info("Savepoint " + savepointTime + " deleted");
+ SavepointHelpers.deleteSavepoint(table, savepointTime);
}
/**
- * Delete a compaction request that is pending.
+ * Restore the data to the savepoint.
*
- * NOTE - This is an Admin operation. With async compaction, this is expected to be called with async compaction and
- * write shutdown. Otherwise, async compactor could fail with errors
- *
- * @param compactionTime - delete the compaction time
- */
- private void deleteRequestedCompaction(String compactionTime) {
- HoodieTable<T> table = HoodieTable.create(config, jsc);
- HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
- HoodieInstant compactionRequestedInstant =
- new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime);
- boolean isCompactionInstantInRequestedState =
- table.getActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionRequestedInstant);
- HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
- if (commitTimeline.empty() && !commitTimeline.findInstantsAfter(compactionTime, Integer.MAX_VALUE).empty()) {
- throw new HoodieRollbackException(
- "Found commits after time :" + compactionTime + ", please rollback greater commits first");
- }
- if (isCompactionInstantInRequestedState) {
- activeTimeline.deleteCompactionRequested(compactionRequestedInstant);
- } else {
- throw new IllegalArgumentException("Compaction is not in requested state " + compactionTime);
- }
- LOG.info("Compaction " + compactionTime + " deleted");
- }
-
- /**
- * Restore the state to the savepoint. WARNING: This rollsback recent commits and deleted data files. Queries
- * accessing the files will mostly fail. This should be done during a downtime.
+ * WARNING: This rolls back recent commits and deleted data files and also pending compactions after savepoint time.
+ * Queries accessing the files will mostly fail. This is expected to be a manual operation and no concurrent write or
+ * compaction is expected to be running
*
* @param savepointTime - savepoint time to rollback to
- * @return true if the savepoint was rollecback to successfully
+ * @return true if the savepoint was restored to successfully
*/
- public boolean restoreToSavepoint(String savepointTime) {
+ public void restoreToSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.create(config, jsc);
- HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-
- // Rollback to savepoint is expected to be a manual operation and no concurrent write or compaction is expected
- // to be running. Rollback to savepoint also removes any pending compaction actions that are generated after
- // savepoint time. Allowing pending compaction to be retained is not safe as those workload could be referencing
- // file-slices that will be rolled-back as part of this operation
- HoodieTimeline instantTimeline = table.getMetaClient().getCommitsAndCompactionTimeline();
-
- HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
- boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
- if (!isSavepointPresent) {
- throw new HoodieRollbackException("No savepoint for instantTime " + savepointTime);
- }
-
- List<String> commitsToRollback = instantTimeline.findInstantsAfter(savepointTime, Integer.MAX_VALUE).getInstants()
- .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
- LOG.info("Rolling back commits " + commitsToRollback);
-
+ SavepointHelpers.validateSavepointPresence(table, savepointTime);
restoreToInstant(savepointTime);
-
- // Make sure the rollback was successful
- Option<HoodieInstant> lastInstant =
- activeTimeline.reload().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant();
- ValidationUtils.checkArgument(lastInstant.isPresent());
- ValidationUtils.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
- savepointTime + "is not the last commit after rolling back " + commitsToRollback + ", last commit was "
- + lastInstant.get().getTimestamp());
- return true;
+ SavepointHelpers.validateSavepointRestore(table, savepointTime);
}
/**
@@ -574,7 +436,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
return true;
} else {
- LOG.info("Cannot find instant " + commitInstantTime + " in the timeline, for rollback");
+ LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback");
return false;
}
} catch (Exception e) {
@@ -626,9 +488,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
LOG.info("Cleaner started");
final Timer.Context context = metrics.getCleanCtx();
-
HoodieCleanMetadata metadata = HoodieTable.create(config, jsc).clean(jsc, cleanInstantTime);
-
if (context != null) {
long durationMs = metrics.getDurationInMs(context.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
@@ -636,7 +496,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ " cleanerElaspsedMs" + durationMs);
}
-
return metadata;
}
@@ -692,11 +551,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*
* @param extraMetadata Extra Metadata to be stored
*/
- public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws IOException {
+ public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
- LOG.info("Generate a new instant time " + instantTime);
- boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata);
- return notEmpty ? Option.of(instantTime) : Option.empty();
+ return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
}
/**
@@ -705,35 +562,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param instantTime Compaction Instant Time
* @param extraMetadata Extra Metadata to be stored
*/
- public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata)
- throws IOException {
- HoodieTableMetaClient metaClient = createMetaClient(true);
- // if there are inflight writes, their instantTime must not be less than that of compaction instant time
- metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant().ifPresent(earliestInflight -> {
- ValidationUtils.checkArgument(
- HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER),
- "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
- + ", Compaction scheduled at " + instantTime);
- });
- // Committed and pending compaction instants should have strictly lower timestamps
- List<HoodieInstant> conflictingInstants = metaClient
- .getActiveTimeline().getCommitsAndCompactionTimeline().getInstants().filter(instant -> HoodieTimeline
- .compareTimestamps(instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL))
- .collect(Collectors.toList());
- ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
- "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
- + conflictingInstants);
- HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
- HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime);
- if (workload != null && (workload.getOperations() != null) && (!workload.getOperations().isEmpty())) {
- extraMetadata.ifPresent(workload::setExtraMetadata);
- HoodieInstant compactionInstant =
- new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
- metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant,
- TimelineMetadataUtils.serializeCompactionPlan(workload));
- return true;
- }
- return false;
+ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+ LOG.info("Scheduling compaction at instant time :" + instantTime);
+ Option<HoodieCompactionPlan> plan = HoodieTable.create(config, jsc)
+ .scheduleCompaction(jsc, instantTime, extraMetadata);
+ return plan.isPresent();
}
/**
@@ -742,7 +575,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param compactionInstantTime Compaction Instant Time
* @return RDD of WriteStatus to inspect errors and counts
*/
- public JavaRDD<WriteStatus> compact(String compactionInstantTime) throws IOException {
+ public JavaRDD<WriteStatus> compact(String compactionInstantTime) {
return compact(compactionInstantTime, config.shouldAutoCommit());
}
@@ -754,24 +587,47 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param extraMetadata Extra Metadata to be stored
*/
public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses,
- Option<Map<String, String>> extraMetadata) throws IOException {
- HoodieTableMetaClient metaClient = createMetaClient(true);
- HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
- HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
- HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(
- timeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
- // Merge extra meta-data passed by user with the one already in inflight compaction
- Option<Map<String, String>> mergedMetaData = extraMetadata.map(m -> {
- Map<String, String> merged = new HashMap<>();
- Map<String, String> extraMetaDataFromInstantFile = compactionPlan.getExtraMetadata();
- if (extraMetaDataFromInstantFile != null) {
- merged.putAll(extraMetaDataFromInstantFile);
+ Option<Map<String, String>> extraMetadata) throws IOException {
+ HoodieTable<T> table = HoodieTable.create(config, jsc);
+ HoodieCommitMetadata metadata = CompactHelpers.createCompactionMetadata(
+ table, compactionInstantTime, writeStatuses, config.getSchema());
+ extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+ completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
+ }
+
+ /**
+ * Commit Compaction and track metrics.
+ */
+ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses, HoodieTable<T> table,
+ String compactionCommitTime) {
+
+ List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
+ finalizeWrite(table, compactionCommitTime, writeStats);
+ LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
+ CompactHelpers.completeInflightCompaction(table, compactionCommitTime, metadata);
+
+ if (compactionTimer != null) {
+ long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+ try {
+ metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
+ durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
+ } catch (ParseException e) {
+ throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ + config.getBasePath() + " at time " + compactionCommitTime, e);
}
- // Overwrite/Merge with the user-passed meta-data
- merged.putAll(m);
- return Option.of(merged);
- }).orElseGet(() -> Option.ofNullable(compactionPlan.getExtraMetadata()));
- commitCompaction(writeStatuses, table, compactionInstantTime, true, mergedMetaData);
+ }
+ LOG.info("Compacted successfully on commit " + compactionCommitTime);
+ }
+
+ /**
+ * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
+ *
+ * @param inflightInstant Inflight Compaction Instant
+ * @param table Hoodie Table
+ */
+ public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) {
+ table.rollback(jsc, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
+ table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}
/**
@@ -793,192 +649,32 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param compactionInstantTime Compaction Instant Time
* @return RDD of Write Status
*/
- private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoCommit) throws IOException {
- // Create a Hoodie table which encapsulated the commits and files visible
- HoodieTableMetaClient metaClient = createMetaClient(true);
- HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
- HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
+ private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
+ HoodieTable<T> table = HoodieTable.create(config, jsc);
+ HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
rollbackInflightCompaction(inflightInstant, table);
- metaClient.reloadActiveTimeline();
- pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
+ table.getMetaClient().reloadActiveTimeline();
}
-
- HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
- if (pendingCompactionTimeline.containsInstant(instant)) {
- return runCompaction(instant, metaClient.getActiveTimeline(), autoCommit);
- } else {
- throw new IllegalStateException(
- "No Compaction request available at " + compactionInstantTime + " to run compaction");
- }
- }
-
- /**
- * Perform compaction operations as specified in the compaction commit file.
- *
- * @param compactionInstant Compaction Instant time
- * @param activeTimeline Active Timeline
- * @param autoCommit Commit after compaction
- * @return RDD of Write Status
- */
- private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline,
- boolean autoCommit) throws IOException {
- HoodieTableMetaClient metaClient = createMetaClient(true);
- HoodieCompactionPlan compactionPlan =
- CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp());
- // Mark instant as compaction inflight
- activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);
compactionTimer = metrics.getCompactionCtx();
- // Create a Hoodie table which encapsulated the commits and files visible
- HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
- JavaRDD<WriteStatus> statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan);
- // Force compaction action
- statuses.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
- // pass extra-metada so that it gets stored in commit file automatically
- commitCompaction(statuses, table, compactionInstant.getTimestamp(), autoCommit,
- Option.ofNullable(compactionPlan.getExtraMetadata()));
- return statuses;
- }
-
- /**
- * Commit Compaction and track metrics.
- *
- * @param compactedStatuses Compaction Write status
- * @param table Hoodie Table
- * @param compactionCommitTime Compaction Commit Time
- * @param autoCommit Auto Commit
- * @param extraMetadata Extra Metadata to store
- */
- protected void commitCompaction(JavaRDD<WriteStatus> compactedStatuses, HoodieTable<T> table,
- String compactionCommitTime, boolean autoCommit, Option<Map<String, String>> extraMetadata) {
- if (autoCommit) {
- HoodieCommitMetadata metadata = doCompactionCommit(table, compactedStatuses, compactionCommitTime, extraMetadata);
- if (compactionTimer != null) {
- long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
- try {
- metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
- durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
- } catch (ParseException e) {
- throw new HoodieCommitException("Commit time is not of valid format.Failed to commit compaction "
- + config.getBasePath() + " at time " + compactionCommitTime, e);
- }
- }
- LOG.info("Compacted successfully on commit " + compactionCommitTime);
- } else {
- LOG.info("Compaction did not run for commit " + compactionCommitTime);
+ HoodieWriteMetadata compactionMetadata = table.compact(jsc, compactionInstantTime);
+ JavaRDD<WriteStatus> statuses = compactionMetadata.getWriteStatuses();
+ if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
+ completeCompaction(compactionMetadata.getCommitMetadata().get(), statuses, table, compactionInstantTime);
}
- }
-
- /**
- * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
- *
- * @param inflightInstant Inflight Compaction Instant
- * @param table Hoodie Table
- */
- public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) {
- table.rollback(jsc, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
- table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
- }
-
- private HoodieCommitMetadata doCompactionCommit(HoodieTable<T> table, JavaRDD<WriteStatus> writeStatuses,
- String compactionCommitTime, Option<Map<String, String>> extraMetadata) {
- HoodieTableMetaClient metaClient = table.getMetaClient();
- List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat).collect();
-
- HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
- for (HoodieWriteStat stat : updateStatusMap) {
- metadata.addWriteStat(stat.getPartitionPath(), stat);
- }
-
- metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
-
- // Finalize write
- finalizeWrite(table, compactionCommitTime, updateStatusMap);
-
- // Copy extraMetadata
- extraMetadata.ifPresent(m -> {
- m.forEach(metadata::addMetadata);
- });
-
- LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
- HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-
- try {
- activeTimeline.transitionCompactionInflightToComplete(
- new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime),
- Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
- } catch (IOException e) {
- throw new HoodieCompactionException(
- "Failed to commit " + metaClient.getBasePath() + " at time " + compactionCommitTime, e);
- }
- return metadata;
+ return statuses;
}
/**
* Performs a compaction operation on a table, serially before or after an insert/upsert action.
*/
- private Option<String> forceCompact(Option<Map<String, String>> extraMetadata) throws IOException {
+ private Option<String> inlineCompact(Option<Map<String, String>> extraMetadata) {
Option<String> compactionInstantTimeOpt = scheduleCompaction(extraMetadata);
compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
- try {
- // inline compaction should auto commit as the user is never given control
- compact(compactionInstantTime, true);
- } catch (IOException ioe) {
- throw new HoodieIOException(ioe.getMessage(), ioe);
- }
+ // inline compaction should auto commit as the user is never given control
+ compact(compactionInstantTime, true);
});
return compactionInstantTimeOpt;
}
-
- /**
- * Ensure that the current writerSchema is compatible with the latest schema of this dataset.
- *
- * When inserting/updating data, we read records using the last used schema and convert them to the
- * GenericRecords with writerSchema. Hence, we need to ensure that this conversion can take place without errors.
- *
- * @param hoodieTable The Hoodie Table
- * @param isUpsert If this is a check during upserts
- * @throws HoodieUpsertException If schema check fails during upserts
- * @throws HoodieInsertException If schema check fails during inserts
- */
- private void validateSchema(HoodieTable<T> hoodieTable, final boolean isUpsert)
- throws HoodieUpsertException, HoodieInsertException {
-
- if (!getConfig().getAvroSchemaValidate()) {
- // Check not required
- return;
- }
-
- boolean isValid = false;
- String errorMsg = "WriterSchema is not compatible with the schema present in the Table";
- Throwable internalError = null;
- Schema tableSchema = null;
- Schema writerSchema = null;
- try {
- TableSchemaResolver schemaUtil = new TableSchemaResolver(hoodieTable.getMetaClient());
- writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
- tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata());
- isValid = schemaUtil.isSchemaCompatible(tableSchema, writerSchema);
- } catch (Exception e) {
- // Two error cases are possible:
- // 1. There was no schema as no data has been inserted yet (first time only)
- // 2. Failure in reading the schema
- isValid = hoodieTable.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0;
- errorMsg = "Failed to read latest schema on path " + basePath;
- internalError = e;
- }
-
- if (!isValid) {
- LOG.error(errorMsg);
- LOG.warn("WriterSchema: " + writerSchema);
- LOG.warn("Table latest schema: " + tableSchema);
- if (isUpsert) {
- throw new HoodieUpsertException(errorMsg, internalError);
- } else {
- throw new HoodieInsertException(errorMsg, internalError);
- }
- }
- }
-
}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 910c04c..bb087a2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -22,8 +22,8 @@ import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.table.compact.strategy.CompactionStrategy;
-import org.apache.hudi.table.compact.strategy.LogFileSizeBasedCompactionStrategy;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
import javax.annotation.concurrent.Immutable;
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 50af725..2d98edc 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.MetricsReporterType;
-import org.apache.hudi.table.compact.strategy.CompactionStrategy;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 087a2b0..73c1b23 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ParquetReaderIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -46,7 +47,7 @@ import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.BulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.DeleteCommitActionExecutor;
@@ -56,6 +57,7 @@ import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.UpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
@@ -130,14 +132,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
@Override
- public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) {
- throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
+ public Option<HoodieCompactionPlan> scheduleCompaction(JavaSparkContext jsc, String instantTime, Option<Map<String, String>> extraMetadata) {
+ throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
}
@Override
- public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime,
- HoodieCompactionPlan compactionPlan) {
- throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
+ public HoodieWriteMetadata compact(JavaSparkContext jsc, String compactionInstantTime) {
+ throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
}
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String partitionPath, String fileId,
@@ -202,6 +203,11 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return new CopyOnWriteRollbackActionExecutor(jsc, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
+ @Override
+ public HoodieSavepointMetadata savepoint(JavaSparkContext jsc, String instantToSavepoint, String user, String comment) {
+ return new SavepointActionExecutor(jsc, config, this, instantToSavepoint, user, comment).execute();
+ }
+
public HoodieRestoreMetadata restore(JavaSparkContext jsc, String restoreInstantTime, String instantToRestore) {
return new CopyOnWriteRestoreActionExecutor(jsc, config, this, restoreInstantTime, instantToRestore).execute();
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index 844b9ad..cacc1ef 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -21,19 +21,17 @@ package org.apache.hudi.table;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
import org.apache.hudi.table.action.deltacommit.BulkInsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.BulkInsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.DeleteDeltaCommitActionExecutor;
@@ -41,17 +39,16 @@ import org.apache.hudi.table.action.deltacommit.InsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.InsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.UpsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.UpsertPreppedDeltaCommitActionExecutor;
+import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
-import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import java.io.IOException;
import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
/**
* Implementation of a more real-time Hoodie Table the provides tradeoffs on read and write cost/amplification.
@@ -119,46 +116,16 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
}
@Override
- public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) {
- LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
- Option<HoodieInstant> lastCompaction =
- getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
- String deltaCommitsSinceTs = "0";
- if (lastCompaction.isPresent()) {
- deltaCommitsSinceTs = lastCompaction.get().getTimestamp();
- }
-
- int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline()
- .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
- if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
- LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction
- + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
- + config.getInlineCompactDeltaCommitMax());
- return new HoodieCompactionPlan();
- }
-
- LOG.info("Compacting merge on read table " + config.getBasePath());
- HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
- try {
- return compactor.generateCompactionPlan(jsc, this, config, instantTime,
- ((SyncableFileSystemView) getSliceView()).getPendingCompactionOperations()
- .map(instantTimeCompactionopPair -> instantTimeCompactionopPair.getValue().getFileGroupId())
- .collect(Collectors.toSet()));
-
- } catch (IOException e) {
- throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
- }
+ public Option<HoodieCompactionPlan> scheduleCompaction(JavaSparkContext jsc, String instantTime, Option<Map<String, String>> extraMetadata) {
+ ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor(
+ jsc, config, this, instantTime, extraMetadata);
+ return scheduleCompactionExecutor.execute();
}
@Override
- public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime,
- HoodieCompactionPlan compactionPlan) {
- HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
- try {
- return compactor.compact(jsc, compactionPlan, this, config, compactionInstantTime);
- } catch (IOException e) {
- throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
- }
+ public HoodieWriteMetadata compact(JavaSparkContext jsc, String compactionInstantTime) {
+ RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(jsc, config, this, compactionInstantTime);
+ return compactionExecutor.execute();
}
@Override
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 0275b72..9904411 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,16 +18,17 @@
package org.apache.hudi.table;
+import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.SparkTaskContextSupplier;
-import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
@@ -38,10 +39,10 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -54,9 +55,10 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieSavepointException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -295,24 +297,6 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
}
- /**
- * Get the list of data file names savepointed.
- */
- public Stream<String> getSavepointedDataFiles(String savepointTime) {
- if (!getSavepoints().contains(savepointTime)) {
- throw new HoodieSavepointException(
- "Could not get data files for savepoint " + savepointTime + ". No such savepoint.");
- }
- HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
- HoodieSavepointMetadata metadata;
- try {
- metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(getActiveTimeline().getInstantDetails(instant).get());
- } catch (IOException e) {
- throw new HoodieSavepointException("Could not get savepointed data files for savepoint " + savepointTime, e);
- }
- return metadata.getPartitionMetadata().values().stream().flatMap(s -> s.getSavepointDataFile().stream());
- }
-
public HoodieActiveTimeline getActiveTimeline() {
return metaClient.getActiveTimeline();
}
@@ -329,19 +313,21 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
*
* @param jsc Spark Context
* @param instantTime Instant Time for scheduling compaction
+ * @param extraMetadata additional metadata to write into plan
* @return
*/
- public abstract HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime);
+ public abstract Option<HoodieCompactionPlan> scheduleCompaction(JavaSparkContext jsc,
+ String instantTime,
+ Option<Map<String, String>> extraMetadata);
/**
* Run Compaction on the table. Compaction arranges the data so that it is optimized for data access.
*
* @param jsc Spark Context
* @param compactionInstantTime Instant Time
- * @param compactionPlan Compaction Plan
*/
- public abstract JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime,
- HoodieCompactionPlan compactionPlan);
+ public abstract HoodieWriteMetadata compact(JavaSparkContext jsc,
+ String compactionInstantTime);
/**
* Executes a new clean action.
@@ -366,6 +352,15 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
boolean deleteInstants);
/**
+ * Create a savepoint at the specified instant, so that the table can be restored
+ * to this point-in-timeline later if needed.
+ */
+ public abstract HoodieSavepointMetadata savepoint(JavaSparkContext jsc,
+ String instantToSavepoint,
+ String user,
+ String comment);
+
+ /**
* Restore the table to the given instant. Note that this is a admin table recovery operation
* that would cause any running queries that are accessing file slices written after the instant to fail.
*/
@@ -519,4 +514,52 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
public SparkTaskContextSupplier getSparkTaskContextSupplier() {
return sparkTaskContextSupplier;
}
+
+ /**
+ * Ensure that the current writerSchema is compatible with the latest schema of this dataset.
+ *
+ * When inserting/updating data, we read records using the last used schema and convert them to the
+ * GenericRecords with writerSchema. Hence, we need to ensure that this conversion can take place without errors.
+ *
+ */
+ private void validateSchema() throws HoodieUpsertException, HoodieInsertException {
+
+ if (!config.getAvroSchemaValidate() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
+ // Check not required
+ return;
+ }
+
+ Schema tableSchema;
+ Schema writerSchema;
+ boolean isValid;
+ try {
+ TableSchemaResolver schemaUtil = new TableSchemaResolver(getMetaClient());
+ writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
+ tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata());
+ isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
+ }
+
+ if (!isValid) {
+ throw new HoodieException("Failed schema compatibility check for writerSchema :" + writerSchema
+ + ", table schema :" + tableSchema + ", base path :" + metaClient.getBasePath());
+ }
+ }
+
+ public void validateUpsertSchema() throws HoodieUpsertException {
+ try {
+ validateSchema();
+ } catch (HoodieException e) {
+ throw new HoodieUpsertException("Failed upsert schema compatibility check.", e);
+ }
+ }
+
+ public void validateInsertSchema() throws HoodieInsertException {
+ try {
+ validateSchema();
+ } catch (HoodieException e) {
+ throw new HoodieInsertException("Failed insert schema compability check.", e);
+ }
+ }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java
rename to hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 5bd81ee..bfe9c9d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -68,18 +68,18 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
- * Archiver to bound the growth of <action>.commit files.
+ * Archiver to bound the growth of files under .hoodie meta path.
*/
-public class HoodieCommitArchiveLog {
+public class HoodieTimelineArchiveLog {
- private static final Logger LOG = LogManager.getLogger(HoodieCommitArchiveLog.class);
+ private static final Logger LOG = LogManager.getLogger(HoodieTimelineArchiveLog.class);
private final Path archiveFilePath;
private final HoodieTableMetaClient metaClient;
private final HoodieWriteConfig config;
private Writer writer;
- public HoodieCommitArchiveLog(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
+ public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
this.config = config;
this.metaClient = metaClient;
this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java b/hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
index 64fd4df..c46102a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteMetadata.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action.commit;
+package org.apache.hudi.table.action;
import java.util.List;
import org.apache.hudi.client.WriteStatus;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 62203a3..9750162 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table.action.clean;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
@@ -39,6 +40,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -50,6 +52,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Cleaner is responsible for garbage collecting older files in a given partition path. Such that
@@ -82,6 +85,25 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
}
/**
+ * Get the list of data file names savepointed.
+ */
+ public Stream<String> getSavepointedDataFiles(String savepointTime) {
+ if (!hoodieTable.getSavepoints().contains(savepointTime)) {
+ throw new HoodieSavepointException(
+ "Could not get data files for savepoint " + savepointTime + ". No such savepoint.");
+ }
+ HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
+ HoodieSavepointMetadata metadata;
+ try {
+ metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(
+ hoodieTable.getActiveTimeline().getInstantDetails(instant).get());
+ } catch (IOException e) {
+ throw new HoodieSavepointException("Could not get savepointed data files for savepoint " + savepointTime, e);
+ }
+ return metadata.getPartitionMetadata().values().stream().flatMap(s -> s.getSavepointDataFile().stream());
+ }
+
+ /**
* Returns list of partitions where clean operations needs to be performed.
*
* @param newInstantToRetain New instant to be retained after this cleanup operation
@@ -131,7 +153,8 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
List<String> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
- .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
+ .flatMap(this::getSavepointedDataFiles)
+ .collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained();
@@ -190,7 +213,8 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
- .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
+ .flatMap(this::getSavepointedDataFiles)
+ .collect(Collectors.toList());
// determine if we have enough commits, to start cleaning.
if (commitTimeline.countInstants() > commitsRetained) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index e72c801..93a655e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -39,6 +39,7 @@ import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -165,8 +166,6 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
(HoodieTable<T>)table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
-
- // Trigger the insert and collect statuses
commitOnAutoCommit(result);
}
@@ -207,7 +206,6 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
try {
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-
LOG.info("Committed " + instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
index e0182da..9f5468e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
index fbc8dbb..4755664 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
@@ -30,6 +30,7 @@ import org.apache.hudi.execution.BulkInsertMapFunction;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
index 0b8e75f..3d80a07 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java
index ba25a97..8fa1cb7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java
index 7ee891f..8c0b75f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java
@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -58,8 +59,8 @@ public class DeleteHelper<T extends HoodieRecordPayload<T>> {
}
public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata execute(String instantTime,
- JavaRDD<HoodieKey> keys, JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<T> table,
- CommitActionExecutor<T> deleteExecutor) {
+ JavaRDD<HoodieKey> keys, JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<T> table,
+ CommitActionExecutor<T> deleteExecutor) {
try {
HoodieWriteMetadata result = null;
// De-dupe/merge if needed
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java
index d08dab2..d8944e3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java
index acc9902..b7d64b1 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertPreppedCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java
index efdcae1..0c4d08e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java
index 5999104..d8470ea 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPreppedCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java
index 7faee1c..92dcbb6 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/WriteHelper.java
@@ -25,6 +25,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -35,9 +36,9 @@ import scala.Tuple2;
public class WriteHelper<T extends HoodieRecordPayload<T>> {
public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata write(String instantTime,
- JavaRDD<HoodieRecord<T>> inputRecordsRDD, JavaSparkContext jsc,
- HoodieTable<T> table, boolean shouldCombine,
- int shuffleParallelism, CommitActionExecutor<T> executor, boolean performTagging) {
+ JavaRDD<HoodieRecord<T>> inputRecordsRDD, JavaSparkContext jsc,
+ HoodieTable<T> table, boolean shouldCombine,
+ int shuffleParallelism, CommitActionExecutor<T> executor, boolean performTagging) {
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
new file mode 100644
index 0000000..97fdd0f
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class CompactHelpers {
+
+ public static HoodieCommitMetadata createCompactionMetadata(HoodieTable<?> table,
+ String compactionInstantTime,
+ JavaRDD<WriteStatus> writeStatuses,
+ String schema) throws IOException {
+ byte[] planBytes = table.getActiveTimeline().readCompactionPlanAsBytes(
+ HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get();
+ HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(planBytes);
+ List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat).collect();
+ org.apache.hudi.common.model.HoodieCommitMetadata metadata = new org.apache.hudi.common.model.HoodieCommitMetadata(true);
+ for (HoodieWriteStat stat : updateStatusMap) {
+ metadata.addWriteStat(stat.getPartitionPath(), stat);
+ }
+ metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema);
+ if (compactionPlan.getExtraMetadata() != null) {
+ compactionPlan.getExtraMetadata().forEach(metadata::addMetadata);
+ }
+ return metadata;
+ }
+
+ public static void completeInflightCompaction(HoodieTable<?> table, String compactionCommitTime, HoodieCommitMetadata commitMetadata) {
+ HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ try {
+ activeTimeline.transitionCompactionInflightToComplete(
+ new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime),
+ Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ } catch (IOException e) {
+ throw new HoodieCompactionException(
+ "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + compactionCommitTime, e);
+ }
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieCompactor.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index f5579dd..c81b028 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieCompactor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact;
+package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
index f71ab95..2c0afa9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact;
+package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
@@ -42,7 +42,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.compact.strategy.CompactionStrategy;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
@@ -71,7 +71,6 @@ import static java.util.stream.Collectors.toList;
* passes it through a CompactionFilter and executes all the compactions and writes a new version of base files and make
* a normal commit
*
- * @see HoodieCompactor
*/
public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/OperationResult.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/OperationResult.java
similarity index 97%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/OperationResult.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/OperationResult.java
index 53bc48a..cd00cb9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/OperationResult.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/OperationResult.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact;
+package org.apache.hudi.table.action.compact;
import org.apache.hudi.common.util.Option;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
new file mode 100644
index 0000000..2f99fa1
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.SparkConfigUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.List;
+
+public class RunCompactionActionExecutor extends BaseActionExecutor<HoodieWriteMetadata> {
+
+ private static final Logger LOG = LogManager.getLogger(RunCompactionActionExecutor.class);
+
+ public RunCompactionActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config,
+ HoodieTable<?> table,
+ String instantTime) {
+ super(jsc, config, table, instantTime);
+ }
+
+ @Override
+ public HoodieWriteMetadata execute() {
+ HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(instantTime);
+ HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
+ if (!pendingCompactionTimeline.containsInstant(instant)) {
+ throw new IllegalStateException(
+ "No Compaction request available at " + instantTime + " to run compaction");
+ }
+
+ HoodieWriteMetadata compactionMetadata = new HoodieWriteMetadata();
+ try {
+ HoodieActiveTimeline timeline = table.getActiveTimeline();
+ HoodieCompactionPlan compactionPlan =
+ CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime);
+ // Mark instant as compaction inflight
+ timeline.transitionCompactionRequestedToInflight(instant);
+ table.getMetaClient().reloadActiveTimeline();
+
+ HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
+ JavaRDD<WriteStatus> statuses = compactor.compact(jsc, compactionPlan, table, config, instantTime);
+
+ statuses.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
+ List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collect();
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
+ for (HoodieWriteStat stat : updateStatusMap) {
+ metadata.addWriteStat(stat.getPartitionPath(), stat);
+ }
+ metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
+
+ compactionMetadata.setWriteStatuses(statuses);
+ compactionMetadata.setCommitted(false);
+ compactionMetadata.setCommitMetadata(Option.of(metadata));
+ } catch (IOException e) {
+ throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
+ }
+
+ return compactionMetadata;
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
new file mode 100644
index 0000000..586b5b3
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ScheduleCompactionActionExecutor extends BaseActionExecutor<Option<HoodieCompactionPlan>> {
+
+ private static final Logger LOG = LogManager.getLogger(ScheduleCompactionActionExecutor.class);
+
+ private final Option<Map<String, String>> extraMetadata;
+
+ public ScheduleCompactionActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config,
+ HoodieTable<?> table,
+ String instantTime,
+ Option<Map<String, String>> extraMetadata) {
+ super(jsc, config, table, instantTime);
+ this.extraMetadata = extraMetadata;
+ }
+
+ private HoodieCompactionPlan scheduleCompaction() {
+ LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
+ Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
+ String deltaCommitsSinceTs = "0";
+ if (lastCompaction.isPresent()) {
+ deltaCommitsSinceTs = lastCompaction.get().getTimestamp();
+ }
+
+ int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline()
+ .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
+ if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
+ LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction
+ + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
+ + config.getInlineCompactDeltaCommitMax());
+ return new HoodieCompactionPlan();
+ }
+
+ LOG.info("Compacting merge on read table " + config.getBasePath());
+ HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
+ try {
+ return compactor.generateCompactionPlan(jsc, table, config, instantTime,
+ ((SyncableFileSystemView) table.getSliceView()).getPendingCompactionOperations()
+ .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet()));
+
+ } catch (IOException e) {
+ throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
+ }
+ }
+
+ @Override
+ public Option<HoodieCompactionPlan> execute() {
+ // if there are inflight writes, their instantTime must not be less than that of compaction instant time
+ table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
+ .ifPresent(earliestInflight -> ValidationUtils.checkArgument(
+ HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER),
+ "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ + ", Compaction scheduled at " + instantTime));
+
+ // Committed and pending compaction instants should have strictly lower timestamps
+ List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
+ .getCommitsAndCompactionTimeline().getInstants()
+ .filter(instant -> HoodieTimeline.compareTimestamps(
+ instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL))
+ .collect(Collectors.toList());
+ ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
+ "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ + conflictingInstants);
+
+ HoodieCompactionPlan plan = scheduleCompaction();
+ if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) {
+ extraMetadata.ifPresent(plan::setExtraMetadata);
+ HoodieInstant compactionInstant =
+ new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
+ try {
+ table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
+ TimelineMetadataUtils.serializeCompactionPlan(plan));
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Exception scheduling compaction", ioe);
+ }
+ return Option.of(plan);
+ }
+ return Option.empty();
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedIOCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java
similarity index 97%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedIOCompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java
index f77a015..d93a50f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedIOCompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedPartitionAwareCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
index 24da226..597348f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/CompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java
similarity index 97%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/CompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java
index 928ae3a..0e8e4c6 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/CompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor;
+import org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor;
import java.io.Serializable;
import java.util.HashMap;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/DayBasedCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/DayBasedCompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java
index ea29fdd..4a12bb8 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/DayBasedCompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/LogFileSizeBasedCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/LogFileSizeBasedCompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
index 83f1cca..c9a811a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/LogFileSizeBasedCompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
similarity index 96%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedCompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
index 84f4ee0..ffc437b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedCompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
similarity index 98%
rename from hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
index 9c00e5e..66b5612 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
index 95779a7..5e4b915 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java
@@ -28,7 +28,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.hudi.table.action.commit.BulkInsertHelper;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java
index 7b6e146..5a3fe7a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java
@@ -28,7 +28,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.hudi.table.action.commit.BulkInsertHelper;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java
index 1575406..53d4d84 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeleteDeltaCommitActionExecutor.java
@@ -25,7 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.DeleteHelper;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java
index f76f7fc..2124165 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertDeltaCommitActionExecutor.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.WriteHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java
index 55031ea..0fb787e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/InsertPreppedDeltaCommitActionExecutor.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java
index 1fdf433..1809078 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertDeltaCommitActionExecutor.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.WriteHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java
index 413d2e2..d1773f9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/UpsertPreppedDeltaCommitActionExecutor.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
index 8b95198..498d7b7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
@@ -118,7 +118,7 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
config.shouldAssumeDatePartitioning());
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> {
- HoodieActiveTimeline activeTimeline = table.getActiveTimeline().reload();
+ HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline();
List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
new file mode 100644
index 0000000..d467074
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.savepoint;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieSavepointException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SavepointActionExecutor extends BaseActionExecutor<HoodieSavepointMetadata> {
+
+ private static final Logger LOG = LogManager.getLogger(SavepointActionExecutor.class);
+
+ private final String user;
+ private final String comment;
+
+ public SavepointActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config,
+ HoodieTable<?> table,
+ String instantTime,
+ String user,
+ String comment) {
+ super(jsc, config, table, instantTime);
+ this.user = user;
+ this.comment = comment;
+ }
+
+ @Override
+ public HoodieSavepointMetadata execute() {
+ if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
+ throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
+ }
+ Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
+ HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime);
+ if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
+ throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);
+ }
+
+ try {
+ // Check the last commit that was not cleaned and check if savepoint time is > that commit
+ String lastCommitRetained;
+ if (cleanInstant.isPresent()) {
+ HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
+ .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
+ lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
+ } else {
+ lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
+ }
+
+ // Cannot allow savepoint time on a commit that could have been cleaned
+ ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
+ "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
+
+ Map<String, List<String>> latestFilesMap = jsc.parallelize(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
+ table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()))
+ .mapToPair(partitionPath -> {
+ // Scan all partitions files with this commit time
+ LOG.info("Collecting latest files in partition path " + partitionPath);
+ TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
+ List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
+ .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
+ return new Tuple2<>(partitionPath, latestFiles);
+ })
+ .collectAsMap();
+
+ HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
+ // Nothing to save in the savepoint
+ table.getActiveTimeline().createNewInstant(
+ new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime));
+ table.getActiveTimeline()
+ .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, instantTime),
+ TimelineMetadataUtils.serializeSavepointMetadata(metadata));
+ LOG.info("Savepoint " + instantTime + " created");
+ return metadata;
+ } catch (IOException e) {
+ throw new HoodieSavepointException("Failed to savepoint " + instantTime, e);
+ }
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java
new file mode 100644
index 0000000..06acd46
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.savepoint;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class SavepointHelpers {
+
+ private static final Logger LOG = LogManager.getLogger(SavepointHelpers.class);
+
+ public static void deleteSavepoint(HoodieTable<?> table, String savepointTime) {
+ if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
+ throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
+ }
+ HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
+ boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
+ if (!isSavepointPresent) {
+ LOG.warn("No savepoint present " + savepointTime);
+ return;
+ }
+
+ table.getActiveTimeline().revertToInflight(savePoint);
+ table.getActiveTimeline().deleteInflight(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime));
+ LOG.info("Savepoint " + savepointTime + " deleted");
+ }
+
+ public static void validateSavepointRestore(HoodieTable<?> table, String savepointTime) {
+ // Make sure the restore was successful
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodieInstant> lastInstant = table.getActiveTimeline()
+ .getCommitsAndCompactionTimeline()
+ .filterCompletedAndCompactionInstants()
+ .lastInstant();
+ ValidationUtils.checkArgument(lastInstant.isPresent());
+ ValidationUtils.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
+ savepointTime + " is not the last commit after restoring to savepoint, last commit was "
+ + lastInstant.get().getTimestamp());
+ }
+
+ public static void validateSavepointPresence(HoodieTable<?> table, String savepointTime) {
+ HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
+ boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
+ if (!isSavepointPresent) {
+ throw new HoodieRollbackException("No savepoint for instantTime " + savepointTime);
+ }
+ }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index 16ccb05..7fc0dee 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -58,7 +58,7 @@ public class TestClientRollback extends TestHoodieClientBase {
public void testSavepointAndRollback() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
- try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
+ try (HoodieWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
/**
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index 383fd74..9726465 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -32,7 +32,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.table.compact.OperationResult;
+import org.apache.hudi.table.action.compact.OperationResult;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index 7bfd768..841a09d 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -31,7 +31,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieCommitArchiveLog;
+import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -79,7 +79,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).forTable("test-trip-table").build();
metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result);
}
@@ -157,7 +157,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
verifyInflightInstants(metaClient, 2);
metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
assertTrue(archiveLog.archiveIfRequired(jsc));
@@ -216,7 +216,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100"), dfs.getConf());
@@ -281,7 +281,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
@@ -307,7 +307,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createSavepointFile(basePath, "101", dfs.getConf());
@@ -339,7 +339,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
@@ -386,7 +386,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "1", dfs.getConf());
HoodieInstant instant1 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1");
@@ -426,7 +426,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.convertCommitMetadata(hoodieCommitMetadata);
assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString());
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
similarity index 99%
rename from hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
rename to hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index b5151b3..142e63a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact;
+package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
similarity index 82%
rename from hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
rename to hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index d77d3f8..819a41c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -16,8 +16,9 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact;
+package org.apache.hudi.table.action.compact;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieClientTestHarness;
@@ -29,6 +30,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
@@ -50,6 +52,7 @@ import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
public class TestHoodieCompactor extends HoodieClientTestHarness {
@@ -100,9 +103,10 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
@Test(expected = HoodieNotSupportedException.class)
public void testCompactionOnCopyOnWriteFail() throws Exception {
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
- HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc);
+ HoodieTable<?> table = HoodieTable.create(metaClient, getConfig(), jsc);
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
- table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
+ table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
+ table.compact(jsc, compactionInstantTime);
}
@Test
@@ -118,9 +122,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
writeClient.insert(recordsRDD, newCommitTime).collect();
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
- JavaRDD<WriteStatus> result =
- table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
- assertTrue("If there is nothing to compact, result will be empty", result.isEmpty());
+ Option<HoodieCompactionPlan> plan = table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
+ assertFalse("If there is nothing to compact, result will be empty", plan.isPresent());
}
}
@@ -128,18 +131,16 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
public void testWriteStatusContentsAfterCompaction() throws Exception {
// insert 100 records
HoodieWriteConfig config = getConfig();
- try (HoodieWriteClient writeClient = getWriteClient(config);) {
+ try (HoodieWriteClient writeClient = getWriteClient(config)) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
- List<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime).collect();
+ writeClient.insert(recordsRDD, newCommitTime).collect();
// Update all the 100 records
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTable table = HoodieTable.create(metaClient, config, jsc);
-
+ HoodieTable table = HoodieTable.create(config, jsc);
newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime);
@@ -153,8 +154,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
// Verify that all data file has one log file
- metaClient = HoodieTableMetaClient.reload(metaClient);
- table = HoodieTable.create(metaClient, config, jsc);
+ table = HoodieTable.create(config, jsc);
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles =
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
@@ -162,14 +162,14 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count());
}
}
+ HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime);
// Do a compaction
- metaClient = HoodieTableMetaClient.reload(metaClient);
- table = HoodieTable.create(metaClient, config, jsc);
-
- String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
- JavaRDD<WriteStatus> result =
- table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
+ table = HoodieTable.create(config, jsc);
+ String compactionInstantTime = "102";
+ table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
+ table.getMetaClient().reloadActiveTimeline();
+ JavaRDD<WriteStatus> result = table.compact(jsc, compactionInstantTime).getWriteStatuses();
// Verify that all partition paths are present in the WriteStatus result
for (String partitionPath : dataGen.getPartitionPaths()) {
@@ -184,8 +184,4 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
}
-
- // TODO - after modifying HoodieReadClient to support mor tables - add more tests to make
- // sure the data read is the updated data (compaction correctness)
- // TODO - add more test cases for compactions after a failed commit/compaction
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
similarity index 99%
rename from hudi-client/src/test/java/org/apache/hudi/table/compact/strategy/TestHoodieCompactionStrategy.java
rename to hudi-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
index 88da442..e49aafa 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/strategy/TestHoodieCompactionStrategy.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.table.compact.strategy;
+package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.common.model.HoodieBaseFile;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java
index 86b2303..060b5a3 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java
@@ -138,6 +138,20 @@ public class HoodieTestUtils {
}
}
+ public static void createDeltaCommitFiles(String basePath, String... instantTimes) throws IOException {
+ for (String instantTime : instantTimes) {
+ new File(
+ basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + HoodieTimeline.makeRequestedDeltaFileName(instantTime)).createNewFile();
+ new File(
+ basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + HoodieTimeline.makeInflightDeltaFileName(instantTime)).createNewFile();
+ new File(
+ basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeDeltaFileName(instantTime))
+ .createNewFile();
+ }
+ }
+
public static void createMetadataFolder(String basePath) {
new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs();
}