You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2023/01/17 15:24:25 UTC
[hudi] branch master updated: [HUDI-4148] Add client for Hudi table service manager (TSM) (#6732)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c9bc03ed868 [HUDI-4148] Add client for Hudi table service manager (TSM) (#6732)
c9bc03ed868 is described below
commit c9bc03ed8681ab64eea2520f5511464915389c51
Author: Zhaojing Yu <yu...@bytedance.com>
AuthorDate: Tue Jan 17 23:24:16 2023 +0800
[HUDI-4148] Add client for Hudi table service manager (TSM) (#6732)
Also refactor `Hoodie*Client` classes to separate table service APIs from write ones.
Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
.../org/apache/hudi/client/BaseHoodieClient.java | 68 +-
.../hudi/client/BaseHoodieTableServiceClient.java | 832 +++++++++++++++++++++
.../apache/hudi/client/BaseHoodieWriteClient.java | 554 ++------------
.../client/HoodieTableServiceManagerClient.java | 169 +++++
.../org/apache/hudi/client/RunsTableService.java | 5 +
.../org/apache/hudi/config/HoodieWriteConfig.java | 20 +-
.../action/clean/CleanPlanActionExecutor.java | 12 +-
.../cluster/ClusteringPlanActionExecutor.java | 1 +
.../compact/ScheduleCompactionActionExecutor.java | 14 +-
.../BaseHoodieCompactionPlanGenerator.java | 3 +
.../hudi/client/HoodieFlinkTableServiceClient.java | 214 ++++++
.../apache/hudi/client/HoodieFlinkWriteClient.java | 138 +---
.../hudi/client/HoodieJavaTableServiceClient.java | 65 ++
.../apache/hudi/client/HoodieJavaWriteClient.java | 6 +-
.../hudi/client/HoodieSparkClusteringClient.java | 2 +-
.../hudi/client/SparkRDDTableServiceClient.java | 295 ++++++++
.../apache/hudi/client/SparkRDDWriteClient.java | 211 +-----
.../org/apache/hudi/client/TestClientRollback.java | 2 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 6 +-
.../config/HoodieTableServiceManagerConfig.java | 172 +++++
.../sink/clustering/HoodieFlinkClusteringJob.java | 2 +-
.../java/org/apache/hudi/util/ClusteringUtil.java | 4 +-
.../functional/SparkRDDWriteClientOverride.java | 38 -
.../org/apache/hudi/functional/TestBootstrap.java | 13 +-
.../apache/hudi/functional/TestOrcBootstrap.java | 20 +-
.../apache/hudi/utilities/HoodieClusteringJob.java | 4 +-
.../hudi/utilities/deltastreamer/DeltaSync.java | 2 +-
27 files changed, 1957 insertions(+), 915 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index b41747d83a8..d076184e24a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -20,14 +20,24 @@ package org.apache.hudi.client;
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.client.utils.TransactionUtils;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.table.HoodieTable;
+import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
@@ -35,6 +45,8 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
/**
* Abstract class taking care of holding common member variables (FileSystem, SparkContext, HoodieConfigs) Also, manages
@@ -47,9 +59,11 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
protected final transient FileSystem fs;
protected final transient HoodieEngineContext context;
protected final transient Configuration hadoopConf;
+ protected final transient HoodieMetrics metrics;
protected final HoodieWriteConfig config;
protected final String basePath;
protected final HoodieHeartbeatClient heartbeatClient;
+ protected final TransactionManager txnManager;
/**
* Timeline Server has the same lifetime as that of Client. Any operations done on the same timeline service will be
@@ -74,6 +88,8 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
shouldStopTimelineServer = !timelineServer.isPresent();
this.heartbeatClient = new HoodieHeartbeatClient(this.fs, this.basePath,
clientConfig.getHoodieClientHeartbeatIntervalInMs(), clientConfig.getHoodieClientHeartbeatTolerableMisses());
+ this.metrics = new HoodieMetrics(config);
+ this.txnManager = new TransactionManager(config, fs);
startEmbeddedServerView();
initWrapperFSMetrics();
}
@@ -85,6 +101,8 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
public void close() {
stopEmbeddedServerView(true);
this.context.setJobStatus("", "");
+ this.heartbeatClient.stop();
+ this.txnManager.close();
}
private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) {
@@ -146,4 +164,52 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
public HoodieHeartbeatClient getHeartbeatClient() {
return heartbeatClient;
}
+
+ /**
+ * Resolve write conflicts before commit.
+ *
+ * @param table A hoodie table instance created after transaction starts so that the latest commits and files are captured.
+ * @param metadata Current committing instant's metadata
+ * @param pendingInflightAndRequestedInstants
+ * @see {@link BaseHoodieWriteClient#preCommit}
+ * @see {@link BaseHoodieTableServiceClient#preCommit}
+ */
+ protected void resolveWriteConflict(HoodieTable table, HoodieCommitMetadata metadata, Set<String> pendingInflightAndRequestedInstants) {
+ Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx();
+ try {
+ TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
+ Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, pendingInflightAndRequestedInstants);
+ metrics.emitConflictResolutionSuccessful();
+ } catch (HoodieWriteConflictException e) {
+ metrics.emitConflictResolutionFailed();
+ throw e;
+ } finally {
+ if (conflictResolutionTimer != null) {
+ conflictResolutionTimer.stop();
+ }
+ }
+ }
+
+ /**
+ * Finalize Write operation.
+ *
+ * @param table HoodieTable
+ * @param instantTime Instant Time
+ * @param stats Hoodie Write Stat
+ */
+ protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
+ try {
+ final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
+ table.finalizeWrite(context, instantTime, stats);
+ if (finalizeCtx != null) {
+ Option<Long> durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
+ durationInMs.ifPresent(duration -> {
+ LOG.info("Finalize write elapsed time (milliseconds): " + duration);
+ metrics.updateFinalizeWriteMetrics(duration, stats.size());
+ });
+ }
+ } catch (HoodieIOException ioe) {
+ throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
+ }
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
new file mode 100644
index 00000000000..7ea70f63998
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.async.AsyncArchiveService;
+import org.apache.hudi.async.AsyncCleanerService;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.common.HoodiePendingRollbackInfo;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
+public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient implements RunsTableService {
+
+ private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
+
+ protected transient Timer.Context compactionTimer;
+ protected transient Timer.Context clusteringTimer;
+ protected transient Timer.Context logCompactionTimer;
+
+ protected transient AsyncCleanerService asyncCleanerService;
+ protected transient AsyncArchiveService asyncArchiveService;
+
+ protected Set<String> pendingInflightAndRequestedInstants;
+
+ protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+ super(context, clientConfig, Option.empty());
+ }
+
+ protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) {
+ if (this.asyncCleanerService == null) {
+ this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
+ } else {
+ this.asyncCleanerService.start(null);
+ }
+ }
+
+ protected void startAsyncArchiveService(BaseHoodieWriteClient writeClient) {
+ if (this.asyncArchiveService == null) {
+ this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient);
+ } else {
+ this.asyncArchiveService.start(null);
+ }
+ }
+
+ protected void asyncClean() {
+ AsyncCleanerService.waitForCompletion(asyncCleanerService);
+ }
+
+ protected void asyncArchive() {
+ AsyncArchiveService.waitForCompletion(asyncArchiveService);
+ }
+
+ protected void setTableServiceTimer(WriteOperationType operationType) {
+ switch (operationType) {
+ case CLUSTER:
+ clusteringTimer = metrics.getClusteringCtx();
+ break;
+ case COMPACT:
+ compactionTimer = metrics.getCompactionCtx();
+ break;
+ case LOG_COMPACT:
+ logCompactionTimer = metrics.getLogCompactionCtx();
+ break;
+ default:
+ }
+ }
+
+ protected void setPendingInflightAndRequestedInstants(Set<String> pendingInflightAndRequestedInstants) {
+ this.pendingInflightAndRequestedInstants = pendingInflightAndRequestedInstants;
+ }
+
+ /**
+ * Any pre-commit actions like conflict resolution goes here.
+ * @param metadata commit metadata for which pre commit is being invoked.
+ */
+ protected void preCommit(HoodieCommitMetadata metadata) {
+ // To be overridden by specific engines to perform conflict resolution if any.
+ }
+
+ /**
+ * Performs a compaction operation on a table, serially before or after an insert/upsert action.
+ * Scheduling and execution is done inline.
+ */
+ protected Option<String> inlineCompaction(Option<Map<String, String>> extraMetadata) {
+ Option<String> compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata);
+ compactionInstantTimeOpt.ifPresent(compactInstantTime -> {
+ // inline compaction should auto commit as the user is never given control
+ compact(compactInstantTime, true);
+ });
+ return compactionInstantTimeOpt;
+ }
+
+ private void inlineCompaction(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+ if (shouldDelegateToTableServiceManager(config, ActionType.compaction)) {
+ scheduleCompaction(extraMetadata);
+ } else {
+ runAnyPendingCompactions(table);
+ inlineCompaction(extraMetadata);
+ }
+ }
+
+ /**
+ * Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time.s
+ *
+ * @param compactionInstantTime Compaction Instant Time
+ * @return Collection of Write Status
+ */
+ protected HoodieWriteMetadata<O> logCompact(String compactionInstantTime, boolean shouldComplete) {
+ throw new UnsupportedOperationException("Log compaction is not supported yet.");
+ }
+
+ /**
+ * Performs a log compaction operation on a table, serially before or after an insert/upsert action.
+ */
+ protected Option<String> inlineLogCompact(Option<Map<String, String>> extraMetadata) {
+ Option<String> logCompactionInstantTimeOpt = scheduleLogCompaction(extraMetadata);
+ logCompactionInstantTimeOpt.ifPresent(logCompactInstantTime -> {
+ // inline log compaction should auto commit as the user is never given control
+ logCompact(logCompactInstantTime, true);
+ });
+ return logCompactionInstantTimeOpt;
+ }
+
+ protected void runAnyPendingCompactions(HoodieTable table) {
+ table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
+ .forEach(instant -> {
+ LOG.info("Running previously failed inflight compaction at instant " + instant);
+ compact(instant.getTimestamp(), true);
+ });
+ }
+
+ protected void runAnyPendingLogCompactions(HoodieTable table) {
+ table.getActiveTimeline().getWriteTimeline().filterPendingLogCompactionTimeline().getInstantsAsStream()
+ .forEach(instant -> {
+ LOG.info("Running previously failed inflight log compaction at instant " + instant);
+ logCompact(instant.getTimestamp(), true);
+ });
+ }
+
+ /***
+ * Schedules compaction inline.
+ * @param extraMetadata extrametada to be used.
+ * @return compaction instant if scheduled.
+ */
+ protected Option<String> inlineScheduleCompaction(Option<Map<String, String>> extraMetadata) {
+ return scheduleCompaction(extraMetadata);
+ }
+
+ /**
+ * Schedules a new compaction instant.
+ *
+ * @param extraMetadata Extra Metadata to be stored
+ */
+ public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+ }
+
+ /**
+ * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time.
+ *
+ * @param compactionInstantTime Compaction Instant Time
+ * @return Collection of Write Status
+ */
+ protected abstract HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean shouldComplete);
+
+ /**
+ * Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file.
+ *
+ * @param compactionInstantTime Compaction Instant Time
+ * @param metadata All the metadata that gets stored along with a commit
+ * @param extraMetadata Extra Metadata to be stored
+ */
+ public abstract void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata,
+ Option<Map<String, String>> extraMetadata);
+
+ /**
+ * Commit Compaction and track metrics.
+ */
+ protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime);
+
+ /**
+ * Schedules a new log compaction instant.
+ * @param extraMetadata Extra Metadata to be stored
+ */
+ public Option<String> scheduleLogCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ return scheduleLogCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+ }
+
+ /**
+ * Schedules a new log compaction instant with passed-in instant time.
+ * @param instantTime Log Compaction Instant Time
+ * @param extraMetadata Extra Metadata to be stored
+ */
+ public boolean scheduleLogCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+ return scheduleTableService(instantTime, extraMetadata, TableServiceType.LOG_COMPACT).isPresent();
+ }
+
+ /**
+ * Performs Log Compaction for the workload stored in instant-time.
+ *
+ * @param logCompactionInstantTime Log Compaction Instant Time
+ * @return Collection of WriteStatus to inspect errors and counts
+ */
+ public HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime) {
+ return logCompact(logCompactionInstantTime, config.shouldAutoCommit());
+ }
+
+ /**
+ * Commit Log Compaction and track metrics.
+ */
+ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable table, String logCompactionCommitTime) {
+ throw new UnsupportedOperationException("Log compaction is not supported yet.");
+ }
+
+
+ /**
+ * Schedules a new compaction instant with passed-in instant time.
+ *
+ * @param instantTime Compaction Instant Time
+ * @param extraMetadata Extra Metadata to be stored
+ */
+ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+ return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
+ }
+
+ /**
+ * Schedules a new clustering instant.
+ *
+ * @param extraMetadata Extra Metadata to be stored
+ */
+ public Option<String> scheduleClustering(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ return scheduleClusteringAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+ }
+
+ /**
+ * Schedules a new clustering instant with passed-in instant time.
+ *
+ * @param instantTime clustering Instant Time
+ * @param extraMetadata Extra Metadata to be stored
+ */
+ public boolean scheduleClusteringAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+ return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLUSTER).isPresent();
+ }
+
+ /**
+ * Schedules a new cleaning instant.
+ *
+ * @param extraMetadata Extra Metadata to be stored
+ */
+ protected Option<String> scheduleCleaning(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ return scheduleCleaningAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+ }
+
+ /**
+ * Schedules a new cleaning instant with passed-in instant time.
+ *
+ * @param instantTime cleaning Instant Time
+ * @param extraMetadata Extra Metadata to be stored
+ */
+ protected boolean scheduleCleaningAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+ return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLEAN).isPresent();
+ }
+
+ /**
+ * Ensures clustering instant is in expected state and performs clustering for the plan stored in metadata.
+ *
+ * @param clusteringInstant Clustering Instant Time
+ * @return Collection of Write Status
+ */
+ public abstract HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete);
+
+ protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+ if (!tableServicesEnabled(config)) {
+ return;
+ }
+
+ if (!config.areAnyTableServicesExecutedInline() && !config.areAnyTableServicesScheduledInline()) {
+ return;
+ }
+
+ if (config.isMetadataTableEnabled()) {
+ table.getHoodieView().sync();
+ }
+ // Do an inline compaction if enabled
+ if (config.inlineCompactionEnabled()) {
+ metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
+ inlineCompaction(table, extraMetadata);
+ } else {
+ metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
+ }
+
+ // if just inline schedule is enabled
+ if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction()
+ && table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
+ // proceed only if there are no pending compactions
+ metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
+ inlineScheduleCompaction(extraMetadata);
+ }
+
+ // Do an inline log compaction if enabled
+ if (config.inlineLogCompactionEnabled()) {
+ runAnyPendingLogCompactions(table);
+ metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
+ inlineLogCompact(extraMetadata);
+ } else {
+ metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
+ }
+
+ // Do an inline clustering if enabled
+ if (config.inlineClusteringEnabled()) {
+ metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
+ inlineClustering(table, extraMetadata);
+ } else {
+ metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
+ }
+
+ // if just inline schedule is enabled
+ if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering()
+ && table.getActiveTimeline().filterPendingReplaceTimeline().empty()) {
+ // proceed only if there are no pending clustering
+ metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true");
+ inlineScheduleClustering(extraMetadata);
+ }
+ }
+
+ /**
+ * Schedule table services such as clustering, compaction & cleaning.
+ *
+ * @param extraMetadata Metadata to pass onto the scheduled service instant
+ * @param tableServiceType Type of table service to schedule
+ * @return
+ */
+ public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
+ TableServiceType tableServiceType) {
+ // A lock is required to guard against race conditions between an on-going writer and scheduling a table service.
+ final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
+ tableServiceType.getAction(), instantTime));
+ try {
+ this.txnManager.beginTransaction(inflightInstant, Option.empty());
+ LOG.info("Scheduling table service " + tableServiceType);
+ return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
+ } finally {
+ this.txnManager.endTransaction(inflightInstant);
+ }
+ }
+
+ protected Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,
+ TableServiceType tableServiceType) {
+ if (!tableServicesEnabled(config)) {
+ return Option.empty();
+ }
+
+ Option<String> option = Option.empty();
+ HoodieTable<?, ?, ?, ?> table = createTable(config, hadoopConf);
+
+ switch (tableServiceType) {
+ case ARCHIVE:
+ LOG.info("Scheduling archiving is not supported. Skipping.");
+ break;
+ case CLUSTER:
+ LOG.info("Scheduling clustering at instant time :" + instantTime);
+ Option<HoodieClusteringPlan> clusteringPlan = table
+ .scheduleClustering(context, instantTime, extraMetadata);
+ option = clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+ break;
+ case COMPACT:
+ LOG.info("Scheduling compaction at instant time :" + instantTime);
+ Option<HoodieCompactionPlan> compactionPlan = table
+ .scheduleCompaction(context, instantTime, extraMetadata);
+ option = compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+ break;
+ case LOG_COMPACT:
+ LOG.info("Scheduling log compaction at instant time :" + instantTime);
+ Option<HoodieCompactionPlan> logCompactionPlan = table
+ .scheduleLogCompaction(context, instantTime, extraMetadata);
+ option = logCompactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+ break;
+ case CLEAN:
+ LOG.info("Scheduling cleaning at instant time :" + instantTime);
+ Option<HoodieCleanerPlan> cleanerPlan = table
+ .scheduleCleaning(context, instantTime, extraMetadata);
+ option = cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
+ }
+
+ Option<String> instantRange = delegateToTableServiceManager(tableServiceType, table);
+ if (instantRange.isPresent()) {
+ LOG.info("Delegate instant [" + instantRange.get() + "] to table service manager");
+ }
+
+ return option;
+ }
+
+ protected abstract HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf);
+
+ /**
+ * Executes a clustering plan on a table, serially before or after an insert/upsert action.
+ * Schedules and executes clustering inline.
+ */
+ protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
+ Option<String> clusteringInstantOpt = inlineScheduleClustering(extraMetadata);
+ clusteringInstantOpt.ifPresent(clusteringInstant -> {
+ // inline cluster should auto commit as the user is never given control
+ cluster(clusteringInstant, true);
+ });
+ return clusteringInstantOpt;
+ }
+
+ private void inlineClustering(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+ if (shouldDelegateToTableServiceManager(config, ActionType.replacecommit)) {
+ scheduleClustering(extraMetadata);
+ } else {
+ runAnyPendingClustering(table);
+ inlineClustering(extraMetadata);
+ }
+ }
+
+ /**
+ * Schedules clustering inline.
+ *
+ * @param extraMetadata extrametadata to use.
+ * @return clustering instant if scheduled.
+ */
+ protected Option<String> inlineScheduleClustering(Option<Map<String, String>> extraMetadata) {
+ return scheduleClustering(extraMetadata);
+ }
+
+ protected void runAnyPendingClustering(HoodieTable table) {
+ table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
+ if (instantPlan.isPresent()) {
+ LOG.info("Running pending clustering at instant " + instantPlan.get().getLeft());
+ cluster(instant.getTimestamp(), true);
+ }
+ });
+ }
+
+ /**
+ * Write the HoodieCommitMetadata to metadata table if available.
+ *
+ * @param table {@link HoodieTable} of interest.
+ * @param instantTime instant time of the commit.
+ * @param actionType action type of the commit.
+ * @param metadata instance of {@link HoodieCommitMetadata}.
+ */
+ protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
+ checkArgument(table.isTableServiceAction(actionType, instantTime), String.format("Unsupported action: %s.%s is not table service.", actionType, instantTime));
+ context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
+ table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime, true));
+ }
+
+ /**
+ * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
+ * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
+ * cleaned). This API provides the flexibility to schedule clean instant asynchronously via
+ * {@link BaseHoodieTableServiceClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling
+ * of clean.
+ *
+ * @param cleanInstantTime instant time for clean.
+ * @param scheduleInline true if needs to be scheduled inline. false otherwise.
+ * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
+ */
+ @Nullable
+ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException {
+ if (!tableServicesEnabled(config)) {
+ return null;
+ }
+ final Timer.Context timerContext = metrics.getCleanCtx();
+ CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
+ HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
+
+ HoodieTable table = createTable(config, hadoopConf);
+ if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
+ LOG.info("Cleaner started");
+ // proceed only if multiple clean schedules are enabled or if there are no pending cleans.
+ if (scheduleInline) {
+ scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
+ table.getMetaClient().reloadActiveTimeline();
+ }
+
+ if (shouldDelegateToTableServiceManager(config, ActionType.clean)) {
+ LOG.warn("Cleaning is not yet supported with Table Service Manager.");
+ return null;
+ }
+ }
+
+ // Proceeds to execute any requested or inflight clean instances in the timeline
+ HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);
+ if (timerContext != null && metadata != null) {
+ long durationMs = metrics.getDurationInMs(timerContext.stop());
+ metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
+ LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ + " cleanerElapsedMs" + durationMs);
+ }
+ return metadata;
+ }
+
+ /**
+ * Trigger archival for the table. This ensures that the number of commits do not explode
+ * and keep increasing unbounded over time.
+ * @param table table to commit on.
+ * @param acquireLockForArchival true if lock has to be acquired for archival. false otherwise.
+ */
+ protected void archive(HoodieTable table, boolean acquireLockForArchival) {
+ if (!tableServicesEnabled(config)) {
+ return;
+ }
+ try {
+ // We cannot have unbounded commit files. Archive commits if we have to archive
+ HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
+ archiver.archiveIfRequired(context, acquireLockForArchival);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Failed to archive", ioe);
+ }
+ }
+
+ /**
+ * Get inflight time line exclude compaction and clustering.
+ * @param metaClient
+ * @return
+ */
+ private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient metaClient) {
+ HoodieTimeline inflightTimelineWithReplaceCommit = metaClient.getCommitsTimeline().filterPendingExcludingCompaction();
+ HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit.filter(instant -> {
+ if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, instant);
+ return !instantPlan.isPresent();
+ } else {
+ return true;
+ }
+ });
+ return inflightTimelineExcludeClusteringCommit;
+ }
+
+ protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
+ return getPendingRollbackInfo(metaClient, commitToRollback, true);
+ }
+
+ public Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
+ return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty());
+ }
+
+ protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
+ return getPendingRollbackInfos(metaClient, true);
+ }
+
+ /**
+ * Fetch map of pending commits to be rolled-back to {@link HoodiePendingRollbackInfo}.
+ * @param metaClient instance of {@link HoodieTableMetaClient} to use.
+ * @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair.
+ */
+ protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) {
+ List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants();
+ Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
+ for (HoodieInstant rollbackInstant : instants) {
+ HoodieRollbackPlan rollbackPlan;
+ try {
+ rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant);
+ } catch (Exception e) {
+ if (rollbackInstant.isRequested()) {
+ LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", deleting the plan since it's in REQUESTED state", e);
+ try {
+ metaClient.getActiveTimeline().deletePending(rollbackInstant);
+ } catch (HoodieIOException he) {
+ LOG.warn("Cannot delete " + rollbackInstant, he);
+ continue;
+ }
+ } else {
+ // Here we assume that if the rollback is inflight, the rollback plan is intact
+ // in instant.rollback.requested. The exception here can be due to other reasons.
+ LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", skip the plan", e);
+ }
+ continue;
+ }
+
+ try {
+ String action = rollbackPlan.getInstantToRollback().getAction();
+ if (ignoreCompactionAndClusteringInstants) {
+ if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
+ boolean isClustering = HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)
+ && ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(true, rollbackPlan.getInstantToRollback().getAction(),
+ rollbackPlan.getInstantToRollback().getCommitTime())).isPresent();
+ if (!isClustering) {
+ String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
+ infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
+ }
+ }
+ } else {
+ infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
+ }
+ } catch (Exception e) {
+ LOG.warn("Processing rollback plan failed for " + rollbackInstant + ", skip the plan", e);
+ }
+ }
+ return infoMap;
+ }
+
+ /**
+ * Rollback all failed writes.
+ */
+ protected Boolean rollbackFailedWrites() {
+ return rollbackFailedWrites(false);
+ }
+
+ /**
+ * Rollback all failed writes.
+ * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
+ */
+ protected Boolean rollbackFailedWrites(boolean skipLocking) {
+ HoodieTable table = createTable(config, hadoopConf);
+ List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty());
+ Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
+ instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
+ rollbackFailedWrites(pendingRollbacks, skipLocking);
+ return true;
+ }
+
+ protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
+ // sort in reverse order of commit times
+ LinkedHashMap<String, Option<HoodiePendingRollbackInfo>> reverseSortedRollbackInstants = instantsToRollback.entrySet()
+ .stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
+ for (Map.Entry<String, Option<HoodiePendingRollbackInfo>> entry : reverseSortedRollbackInstants.entrySet()) {
+ if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
+ HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
+ // do we need to handle failed rollback of a bootstrap
+ rollbackFailedBootstrap();
+ HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
+ break;
+ } else {
+ rollback(entry.getKey(), entry.getValue(), skipLocking);
+ HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
+ }
+ }
+ }
+
+ protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option<String> curInstantTime) {
+ Stream<HoodieInstant> inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient)
+ .getReverseOrderedInstants();
+ if (cleaningPolicy.isEager()) {
+ return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
+ if (curInstantTime.isPresent()) {
+ return !entry.equals(curInstantTime.get());
+ } else {
+ return true;
+ }
+ }).collect(Collectors.toList());
+ } else if (cleaningPolicy.isLazy()) {
+ return inflightInstantsStream.filter(instant -> {
+ try {
+ return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
+ } catch (IOException io) {
+ throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
+ }
+ }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ } else if (cleaningPolicy.isNever()) {
+ return Collections.emptyList();
+ } else {
+ throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy());
+ }
+ }
+
+ /**
+ * @Deprecated
+ * Rollback the inflight record changes with the given commit time. This
+ * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String, boolean)
+ *
+ * @param commitInstantTime Instant time of the commit
+ * @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt.
+ * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
+ * @throws HoodieRollbackException if rollback cannot be performed successfully
+ */
+ @Deprecated
+ public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
+ LOG.info("Begin rollback of instant " + commitInstantTime);
+ final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
+ final Timer.Context timerContext = this.metrics.getRollbackCtx();
+ try {
+ HoodieTable table = createTable(config, hadoopConf);
+ Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
+ .findFirst());
+ if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
+ LOG.info(String.format("Scheduling Rollback at instant time : %s "
+ + "(exists in active timeline: %s), with rollback plan: %s",
+ rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent()));
+ Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
+ .orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()));
+ if (rollbackPlanOption.isPresent()) {
+ // There can be a case where the inflight rollback failed after the instant files
+ // are deleted for commitInstantTime, so that commitInstantOpt is empty as it is
+ // not present in the timeline. In such a case, the hoodie instant instance
+ // is reconstructed to allow the rollback to be reattempted, and the deleteInstants
+ // is set to false since they are already deleted.
+ // Execute rollback
+ HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
+ ? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking)
+ : table.rollback(context, rollbackInstantTime, new HoodieInstant(
+ true, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
+ false, skipLocking);
+ if (timerContext != null) {
+ long durationInMs = metrics.getDurationInMs(timerContext.stop());
+ metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
+ }
+ return true;
+ } else {
+ throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime);
+ }
+ } else {
+ LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback");
+ return false;
+ }
+ } catch (Exception e) {
+ throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime, e);
+ }
+ }
+
+ /**
+ * Main API to rollback failed bootstrap.
+ */
+ public void rollbackFailedBootstrap() {
+ LOG.info("Rolling back pending bootstrap if present");
+ HoodieTable table = createTable(config, hadoopConf);
+ HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();
+ Option<String> instant = Option.fromJavaOptional(
+ inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
+ if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
+ HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
+ LOG.info("Found pending bootstrap instants. Rolling them back");
+ table.rollbackBootstrap(context, HoodieActiveTimeline.createNewInstantTime());
+ LOG.info("Finished rolling back pending bootstrap");
+ }
+ }
+
+ private Option<String> delegateToTableServiceManager(TableServiceType tableServiceType, HoodieTable table) {
+ if (!config.getTableServiceManagerConfig().isEnabledAndActionSupported(ActionType.compaction)) {
+ return Option.empty();
+ }
+ HoodieTableServiceManagerClient tableServiceManagerClient = new HoodieTableServiceManagerClient(table.getMetaClient(), config.getTableServiceManagerConfig());
+ switch (tableServiceType) {
+ case COMPACT:
+ return tableServiceManagerClient.executeCompaction();
+ case CLUSTER:
+ return tableServiceManagerClient.executeClustering();
+ case CLEAN:
+ return tableServiceManagerClient.executeClean();
+ default:
+ LOG.info("Not supported delegate to table service manager, tableServiceType : " + tableServiceType.getAction());
+ return Option.empty();
+ }
+ }
+
+ @Override
+ public void close() {
+ AsyncArchiveService.forceShutdown(asyncArchiveService);
+ asyncArchiveService = null;
+ AsyncCleanerService.forceShutdown(asyncCleanerService);
+ asyncCleanerService = null;
+ // Stop timeline-server if running
+ super.close();
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 04b70d5545c..b361f8918c4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -18,29 +18,23 @@
package org.apache.hudi.client;
-import org.apache.hudi.async.AsyncArchiveService;
-import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.avro.model.HoodieClusteringPlan;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
-import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
@@ -55,15 +49,12 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
-import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
-import org.apache.hudi.config.HoodieClusteringConfig;
-import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
@@ -90,7 +81,6 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.restore.RestoreUtils;
-import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
@@ -108,13 +98,10 @@ import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
@@ -128,8 +115,7 @@ import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
* @param <K> Type of keys
* @param <O> Type of outputs
*/
-public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
- implements RunsTableService {
+public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient implements RunsTableService {
protected static final String LOOKUP_STR = "lookup";
private static final long serialVersionUID = 1L;
@@ -140,18 +126,13 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
private transient WriteOperationType operationType;
private transient HoodieWriteCommitCallback commitCallback;
- protected final transient HoodieMetrics metrics;
protected transient Timer.Context writeTimer = null;
- protected transient Timer.Context compactionTimer;
- protected transient Timer.Context clusteringTimer;
- protected transient Timer.Context logCompactionTimer;
- protected transient AsyncCleanerService asyncCleanerService;
- protected transient AsyncArchiveService asyncArchiveService;
- protected final TransactionManager txnManager;
protected Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxnAndMetadata = Option.empty();
protected Set<String> pendingInflightAndRequestedInstants;
+ protected BaseHoodieTableServiceClient<O> tableServiceClient;
+
/**
* Create a write client, with new hudi index.
* @param context HoodieEngineContext
@@ -178,9 +159,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
Option<EmbeddedTimelineService> timelineService,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
super(context, writeConfig, timelineService);
- this.metrics = new HoodieMetrics(config);
this.index = createIndex(writeConfig);
- this.txnManager = new TransactionManager(config, fs);
this.upgradeDowngradeHelper = upgradeDowngradeHelper;
}
@@ -194,6 +173,10 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
return this.operationType;
}
+ public BaseHoodieTableServiceClient<O> getTableServiceClient() {
+ return tableServiceClient;
+ }
+
/**
* Commit changes performed at the given instantTime marker.
*/
@@ -363,9 +346,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
* @param metadata instance of {@link HoodieCommitMetadata}.
*/
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
- context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
- table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
- table.isTableServiceAction(actionType, instantTime)));
+ if (table.isTableServiceAction(actionType, instantTime)) {
+ tableServiceClient.writeTableMetadata(table, instantTime, actionType, metadata);
+ } else {
+ context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
+ table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime, false));
+ }
}
/**
@@ -385,27 +371,10 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
throw new HoodieException("Cannot bootstrap the table in multi-writer mode");
}
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
- rollbackFailedBootstrap();
+ tableServiceClient.rollbackFailedBootstrap();
table.bootstrap(context, extraMetadata);
}
- /**
- * Main API to rollback failed bootstrap.
- */
- protected void rollbackFailedBootstrap() {
- LOG.info("Rolling back pending bootstrap if present");
- HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
- HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();
- Option<String> instant = Option.fromJavaOptional(
- inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
- if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
- HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
- LOG.info("Found pending bootstrap instants. Rolling them back");
- table.rollbackBootstrap(context, HoodieActiveTimeline.createNewInstantTime());
- LOG.info("Finished rolling back pending bootstrap");
- }
- }
-
/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
*
@@ -524,16 +493,9 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) : Option.empty();
this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
this.pendingInflightAndRequestedInstants.remove(instantTime);
- if (null == this.asyncCleanerService) {
- this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
- } else {
- this.asyncCleanerService.start(null);
- }
- if (null == this.asyncArchiveService) {
- this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(this);
- } else {
- this.asyncArchiveService.start(null);
- }
+ tableServiceClient.setPendingInflightAndRequestedInstants(this.pendingInflightAndRequestedInstants);
+ tableServiceClient.startAsyncCleanerService(this);
+ tableServiceClient.startAsyncArchiveService(this);
}
/**
@@ -568,82 +530,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
}
protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
- if (!tableServicesEnabled(config)) {
- return;
- }
- if (config.areAnyTableServicesExecutedInline() || config.areAnyTableServicesScheduledInline()) {
- if (config.isMetadataTableEnabled()) {
- table.getHoodieView().sync();
- }
- // Do an inline compaction if enabled
- if (config.inlineCompactionEnabled()) {
- runAnyPendingCompactions(table);
- metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
- inlineCompaction(extraMetadata);
- } else {
- metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
- }
-
- // if just inline schedule is enabled
- if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction()
- && table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
- // proceed only if there are no pending compactions
- metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
- inlineScheduleCompaction(extraMetadata);
- }
-
- // Do an inline log compaction if enabled
- if (config.inlineLogCompactionEnabled()) {
- runAnyPendingLogCompactions(table);
- metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
- inlineLogCompact(extraMetadata);
- } else {
- metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
- }
-
- // Do an inline clustering if enabled
- if (config.inlineClusteringEnabled()) {
- runAnyPendingClustering(table);
- metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
- inlineClustering(extraMetadata);
- } else {
- metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
- }
-
- // if just inline schedule is enabled
- if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering()
- && table.getActiveTimeline().filterPendingReplaceTimeline().empty()) {
- // proceed only if there are no pending clustering
- metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true");
- inlineScheduleClustering(extraMetadata);
- }
- }
- }
-
- protected void runAnyPendingCompactions(HoodieTable table) {
- table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
- .forEach(instant -> {
- LOG.info("Running previously failed inflight compaction at instant " + instant);
- compact(instant.getTimestamp(), true);
- });
- }
-
- protected void runAnyPendingLogCompactions(HoodieTable table) {
- table.getActiveTimeline().getWriteTimeline().filterPendingLogCompactionTimeline().getInstantsAsStream()
- .forEach(instant -> {
- LOG.info("Running previously failed inflight log compaction at instant " + instant);
- logCompact(instant.getTimestamp(), true);
- });
- }
-
- protected void runAnyPendingClustering(HoodieTable table) {
- table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
- Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
- if (instantPlan.isPresent()) {
- LOG.info("Running pending clustering at instant " + instantPlan.get().getLeft());
- cluster(instant.getTimestamp(), true);
- }
- });
+ tableServiceClient.runTableServicesInline(table, metadata, extraMetadata);
}
protected void autoCleanOnCommit() {
@@ -653,7 +540,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
if (config.isAsyncClean()) {
LOG.info("Async cleaner has been spawned. Waiting for it to finish");
- AsyncCleanerService.waitForCompletion(asyncCleanerService);
+ tableServiceClient.asyncClean();
LOG.info("Async cleaner has finished");
} else {
LOG.info("Start to clean synchronously.");
@@ -669,7 +556,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
if (config.isAsyncArchive()) {
LOG.info("Async archiver has been spawned. Waiting for it to finish");
- AsyncArchiveService.waitForCompletion(asyncArchiveService);
+ tableServiceClient.asyncArchive();
LOG.info("Async archiver has finished");
} else {
LOG.info("Start to archive synchronously.");
@@ -681,7 +568,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
* Run any pending compactions.
*/
public void runAnyPendingCompactions() {
- runAnyPendingCompactions(createTable(config, hadoopConf));
+ tableServiceClient.runAnyPendingCompactions(createTable(config, hadoopConf));
}
/**
@@ -774,63 +661,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
@Deprecated
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty());
- Option<HoodiePendingRollbackInfo> pendingRollbackInfo = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
- return rollback(commitInstantTime, pendingRollbackInfo, false);
- }
-
- /**
- * @Deprecated
- * Rollback the inflight record changes with the given commit time. This
- * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String, boolean)
- *
- * @param commitInstantTime Instant time of the commit
- * @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt.
- * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
- * @throws HoodieRollbackException if rollback cannot be performed successfully
- */
- @Deprecated
- public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
- LOG.info("Begin rollback of instant " + commitInstantTime);
- final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
- final Timer.Context timerContext = this.metrics.getRollbackCtx();
- try {
- HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
- Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
- .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
- .findFirst());
- if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
- LOG.info(String.format("Scheduling Rollback at instant time : %s "
- + "(exists in active timeline: %s), with rollback plan: %s",
- rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent()));
- Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
- .orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()));
- if (rollbackPlanOption.isPresent()) {
- // There can be a case where the inflight rollback failed after the instant files
- // are deleted for commitInstantTime, so that commitInstantOpt is empty as it is
- // not present in the timeline. In such a case, the hoodie instant instance
- // is reconstructed to allow the rollback to be reattempted, and the deleteInstants
- // is set to false since they are already deleted.
- // Execute rollback
- HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
- ? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking)
- : table.rollback(context, rollbackInstantTime, new HoodieInstant(
- true, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
- false, skipLocking);
- if (timerContext != null) {
- long durationInMs = metrics.getDurationInMs(timerContext.stop());
- metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
- }
- return true;
- } else {
- throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime);
- }
- } else {
- LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback");
- return false;
- }
- } catch (Exception e) {
- throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime, e);
- }
+ Option<HoodiePendingRollbackInfo> pendingRollbackInfo = tableServiceClient.getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ return tableServiceClient.rollback(commitInstantTime, pendingRollbackInfo, false);
}
/**
@@ -912,33 +744,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
*/
public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException {
- if (!tableServicesEnabled(config)) {
- return null;
- }
- final Timer.Context timerContext = metrics.getCleanCtx();
- CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
- HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
-
- HoodieTable table = createTable(config, hadoopConf);
- if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
- LOG.info("Cleaner started");
- // proceed only if multiple clean schedules are enabled or if there are no pending cleans.
- if (scheduleInline) {
- scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
- table.getMetaClient().reloadActiveTimeline();
- }
- }
-
- // Proceeds to execute any requested or inflight clean instances in the timeline
- HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);
- if (timerContext != null && metadata != null) {
- long durationMs = metrics.getDurationInMs(timerContext.stop());
- metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
- LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
- + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
- + " cleanerElapsedMs" + durationMs);
- }
- return metadata;
+ return tableServiceClient.clean(cleanInstantTime, scheduleInline, skipLocking);
}
public HoodieCleanMetadata clean() {
@@ -962,16 +768,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
* @param acquireLockForArchival true if lock has to be acquired for archival. false otherwise.
*/
protected void archive(HoodieTable table, boolean acquireLockForArchival) {
- if (!tableServicesEnabled(config)) {
- return;
- }
- try {
- // We cannot have unbounded commit files. Archive commits if we have to archive
- HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
- archiver.archiveIfRequired(context, acquireLockForArchival);
- } catch (IOException ioe) {
- throw new HoodieIOException("Failed to archive", ioe);
- }
+ tableServiceClient.archive(table, acquireLockForArchival);
}
/**
@@ -997,7 +794,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
*/
public String startCommit(String actionType, HoodieTableMetaClient metaClient) {
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
- HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
+ HoodieTimeline.COMMIT_ACTION, () -> tableServiceClient.rollbackFailedWrites());
String instantTime = HoodieActiveTimeline.createNewInstantTime();
startCommit(instantTime, actionType, metaClient);
return instantTime;
@@ -1025,7 +822,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
*/
private void startCommitWithTime(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
- HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
+ HoodieTimeline.COMMIT_ACTION, () -> tableServiceClient.rollbackFailedWrites());
startCommit(instantTime, actionType, metaClient);
}
@@ -1114,6 +911,19 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
}
}
+ /**
+ * Performs Clustering for the workload stored in instant-time.
+ *
+ * @param clusteringInstantTime Clustering Instant Time
+ * @return Collection of WriteStatus to inspect errors and counts
+ */
+ public HoodieWriteMetadata<O> cluster(String clusteringInstantTime) {
+ if (shouldDelegateToTableServiceManager(config, ActionType.replacecommit)) {
+ throw new UnsupportedOperationException("Clustering should be delegated to table service manager instead of direct run.");
+ }
+ return cluster(clusteringInstantTime, true);
+ }
+
/**
* Performs Compaction for the workload stored in instant-time.
*
@@ -1121,6 +931,9 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
* @return Collection of WriteStatus to inspect errors and counts
*/
public HoodieWriteMetadata<O> compact(String compactionInstantTime) {
+ if (shouldDelegateToTableServiceManager(config, ActionType.compaction)) {
+ throw new UnsupportedOperationException("Compaction should be delegated to table service manager instead of direct run.");
+ }
return compact(compactionInstantTime, config.shouldAutoCommit());
}
@@ -1186,152 +999,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
throw new UnsupportedOperationException("Log compaction is not supported yet.");
}
- /**
- * Get inflight time line exclude compaction and clustering.
- * @param metaClient
- * @return
- */
- private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient metaClient) {
- HoodieTimeline inflightTimelineWithReplaceCommit = metaClient.getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();
- HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit.filter(instant -> {
- if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, instant);
- return !instantPlan.isPresent();
- } else {
- return true;
- }
- });
- return inflightTimelineExcludeClusteringCommit;
- }
-
- protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
- return getPendingRollbackInfo(metaClient, commitToRollback, true);
- }
-
- public Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
- return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty());
- }
-
- protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
- return getPendingRollbackInfos(metaClient, true);
- }
-
- /**
- * Fetch map of pending commits to be rolled-back to {@link HoodiePendingRollbackInfo}.
- * @param metaClient instance of {@link HoodieTableMetaClient} to use.
- * @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair.
- */
- protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) {
- List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants();
- Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
- for (HoodieInstant rollbackInstant : instants) {
- HoodieRollbackPlan rollbackPlan;
- try {
- rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant);
- } catch (Exception e) {
- if (rollbackInstant.isRequested()) {
- LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", deleting the plan since it's in REQUESTED state", e);
- try {
- metaClient.getActiveTimeline().deletePending(rollbackInstant);
- } catch (HoodieIOException he) {
- LOG.warn("Cannot delete " + rollbackInstant, he);
- continue;
- }
- } else {
- // Here we assume that if the rollback is inflight, the rollback plan is intact
- // in instant.rollback.requested. The exception here can be due to other reasons.
- LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", skip the plan", e);
- }
- continue;
- }
-
- try {
- String action = rollbackPlan.getInstantToRollback().getAction();
- if (ignoreCompactionAndClusteringInstants) {
- if (!HoodieTimeline.COMPACTION_ACTION.equals(action) && !HoodieTimeline.LOG_COMPACTION_ACTION.equals(action)) {
- boolean isClustering = HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)
- && ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(true, rollbackPlan.getInstantToRollback().getAction(),
- rollbackPlan.getInstantToRollback().getCommitTime())).isPresent();
- if (!isClustering) {
- String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
- infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
- }
- }
- } else {
- infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
- }
- } catch (Exception e) {
- LOG.warn("Processing rollback plan failed for " + rollbackInstant + ", skip the plan", e);
- }
- }
- return infoMap;
- }
-
- /**
- * Rollback all failed writes.
- */
- protected Boolean rollbackFailedWrites() {
- return rollbackFailedWrites(false);
- }
-
- /**
- * Rollback all failed writes.
- * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
- */
- protected Boolean rollbackFailedWrites(boolean skipLocking) {
- HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
- List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty());
- Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
- instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
- rollbackFailedWrites(pendingRollbacks, skipLocking);
- return true;
- }
-
- protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
- // sort in reverse order of commit times
- LinkedHashMap<String, Option<HoodiePendingRollbackInfo>> reverseSortedRollbackInstants = instantsToRollback.entrySet()
- .stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
- for (Map.Entry<String, Option<HoodiePendingRollbackInfo>> entry : reverseSortedRollbackInstants.entrySet()) {
- if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
- HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
- // do we need to handle failed rollback of a bootstrap
- rollbackFailedBootstrap();
- HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
- break;
- } else {
- rollback(entry.getKey(), entry.getValue(), skipLocking);
- HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
- }
- }
- }
-
- protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option<String> curInstantTime) {
- Stream<HoodieInstant> inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient)
- .getReverseOrderedInstants();
- if (cleaningPolicy.isEager()) {
- return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
- if (curInstantTime.isPresent()) {
- return !entry.equals(curInstantTime.get());
- } else {
- return true;
- }
- }).collect(Collectors.toList());
- } else if (cleaningPolicy.isLazy()) {
- return inflightInstantsStream.filter(instant -> {
- try {
- return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
- } catch (IOException io) {
- throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
- }
- }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
- } else if (cleaningPolicy.isNever()) {
- return Collections.EMPTY_LIST;
- } else {
- throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy());
- }
- }
-
/**
* Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time.
*
@@ -1340,19 +1007,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
*/
protected abstract HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean shouldComplete);
- /**
- * Performs a compaction operation on a table, serially before or after an insert/upsert action.
- * Scheduling and execution is done inline.
- */
- protected Option<String> inlineCompaction(Option<Map<String, String>> extraMetadata) {
- Option<String> compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata);
- compactionInstantTimeOpt.ifPresent(compactInstantTime -> {
- // inline compaction should auto commit as the user is never given control
- compact(compactInstantTime, true);
- });
- return compactionInstantTimeOpt;
- }
-
/***
* Schedules compaction inline.
* @param extraMetadata extrametada to be used.
@@ -1372,18 +1026,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
throw new UnsupportedOperationException("Log compaction is not supported yet.");
}
- /**
- * Performs a log compaction operation on a table, serially before or after an insert/upsert action.
- */
- protected Option<String> inlineLogCompact(Option<Map<String, String>> extraMetadata) {
- Option<String> logCompactionInstantTimeOpt = scheduleLogCompaction(extraMetadata);
- logCompactionInstantTimeOpt.ifPresent(logCompactInstantTime -> {
- // inline log compaction should auto commit as the user is never given control
- logCompact(logCompactInstantTime, true);
- });
- return logCompactionInstantTimeOpt;
- }
-
/**
* Schedules a new clustering instant.
* @param extraMetadata Extra Metadata to be stored
@@ -1402,15 +1044,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLUSTER).isPresent();
}
- /**
- * Schedules a new cleaning instant.
- * @param extraMetadata Extra Metadata to be stored
- */
- protected Option<String> scheduleCleaning(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
- String instantTime = HoodieActiveTimeline.createNewInstantTime();
- return scheduleCleaningAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
- }
-
/**
* Schedules a new cleaning instant with passed-in instant time.
* @param instantTime cleaning Instant Time
@@ -1454,91 +1087,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
try {
this.txnManager.beginTransaction(inflightInstant, Option.empty());
LOG.info("Scheduling table service " + tableServiceType);
- return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
+ return tableServiceClient.scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
} finally {
this.txnManager.endTransaction(inflightInstant);
}
}
- private Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,
- TableServiceType tableServiceType) {
- if (!tableServicesEnabled(config)) {
- return Option.empty();
- }
- switch (tableServiceType) {
- case ARCHIVE:
- LOG.info("Scheduling archiving is not supported. Skipping.");
- return Option.empty();
- case CLUSTER:
- LOG.info("Scheduling clustering at instant time :" + instantTime);
- Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf)
- .scheduleClustering(context, instantTime, extraMetadata);
- return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
- case COMPACT:
- LOG.info("Scheduling compaction at instant time :" + instantTime);
- Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf)
- .scheduleCompaction(context, instantTime, extraMetadata);
- return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
- case LOG_COMPACT:
- LOG.info("Scheduling log compaction at instant time :" + instantTime);
- Option<HoodieCompactionPlan> logCompactionPlan = createTable(config, hadoopConf)
- .scheduleLogCompaction(context, instantTime, extraMetadata);
- return logCompactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
- case CLEAN:
- LOG.info("Scheduling cleaning at instant time :" + instantTime);
- Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
- .scheduleCleaning(context, instantTime, extraMetadata);
- return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
- default:
- throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
- }
- }
-
- /**
- * Executes a clustering plan on a table, serially before or after an insert/upsert action.
- * Schedules and executes clustering inline.
- */
- protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
- Option<String> clusteringInstantOpt = inlineScheduleClustering(extraMetadata);
- clusteringInstantOpt.ifPresent(clusteringInstant -> {
- // inline cluster should auto commit as the user is never given control
- cluster(clusteringInstant, true);
- });
- return clusteringInstantOpt;
- }
-
- /**
- * Schedules clustering inline.
- * @param extraMetadata extrametadata to use.
- * @return clustering instant if scheduled.
- */
- protected Option<String> inlineScheduleClustering(Option<Map<String, String>> extraMetadata) {
- return scheduleClustering(extraMetadata);
- }
-
- /**
- * Finalize Write operation.
- *
- * @param table HoodieTable
- * @param instantTime Instant Time
- * @param stats Hoodie Write Stat
- */
- protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
- try {
- final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
- table.finalizeWrite(context, instantTime, stats);
- if (finalizeCtx != null) {
- Option<Long> durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
- durationInMs.ifPresent(duration -> {
- LOG.info("Finalize write elapsed time (milliseconds): " + duration);
- metrics.updateFinalizeWriteMetrics(duration, stats.size());
- });
- }
- } catch (HoodieIOException ioe) {
- throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
- }
- }
-
public HoodieMetrics getMetrics() {
return metrics;
}
@@ -1619,10 +1173,9 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
setWriteTimer(table);
break;
case CLUSTER:
- clusteringTimer = metrics.getClusteringCtx();
- break;
case COMPACT:
- compactionTimer = metrics.getCompactionCtx();
+ case LOG_COMPACT:
+ tableServiceClient.setTableServiceTimer(operationType);
break;
default:
}
@@ -1670,17 +1223,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
@Override
public void close() {
- AsyncArchiveService.forceShutdown(asyncArchiveService);
- asyncArchiveService = null;
- AsyncCleanerService.forceShutdown(asyncCleanerService);
- asyncCleanerService = null;
// Stop timeline-server if running
super.close();
// Calling this here releases any resources used by your index, so make sure to finish any related operations
// before this point
this.index.close();
- this.heartbeatClient.stop();
- this.txnManager.close();
+ this.tableServiceClient.close();
}
private void setWriteTimer(HoodieTable table) {
@@ -1699,13 +1247,13 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
metaClient = HoodieTableMetaClient.reload(metaClient);
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
- List<String> instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime);
+ List<String> instantsToRollback = tableServiceClient.getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime);
if (!instantsToRollback.isEmpty()) {
- Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
+ Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = tableServiceClient.getPendingRollbackInfos(metaClient);
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
- rollbackFailedWrites(pendingRollbacks, true);
+ tableServiceClient.rollbackFailedWrites(pendingRollbacks, true);
}
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTableServiceManagerClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTableServiceManagerClient.java
new file mode 100644
index 00000000000..780942158cf
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTableServiceManagerClient.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.config.HoodieTableServiceManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.RetryHelper;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Client that sends the table service action instants to the table service manager.
+ */
+public class HoodieTableServiceManagerClient {
+
+ public enum Action {
+ REQUEST,
+ CANCEL,
+ REGISTER
+ }
+
+ private static final String BASE_URL = "/v1/hoodie/service";
+
+ public static final String EXECUTE_COMPACTION = String.format("%s/%s", BASE_URL, "compact");
+
+ public static final String EXECUTE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster");
+
+ public static final String EXECUTE_CLEAN = String.format("%s/%s", BASE_URL, "clean");
+
+ public static final String ACTION = "action";
+ public static final String DATABASE_NAME_PARAM = "db_name";
+ public static final String TABLE_NAME_PARAM = "table_name";
+ public static final String BASEPATH_PARAM = "basepath";
+ public static final String INSTANT_PARAM = "instant";
+ public static final String USERNAME = "username";
+ public static final String CLUSTER = "cluster";
+ public static final String QUEUE = "queue";
+ public static final String RESOURCE = "resource";
+ public static final String PARALLELISM = "parallelism";
+ public static final String EXTRA_PARAMS = "extra_params";
+ public static final String EXECUTION_ENGINE = "execution_engine";
+
+ public static final String RETRY_EXCEPTIONS = "IOException";
+
+ private final HoodieTableServiceManagerConfig config;
+ private final HoodieTableMetaClient metaClient;
+ private final String uri;
+ private final String basePath;
+ private final String dbName;
+ private final String tableName;
+
+ private static final Logger LOG = LogManager.getLogger(HoodieTableServiceManagerClient.class);
+
+ public HoodieTableServiceManagerClient(HoodieTableMetaClient metaClient, HoodieTableServiceManagerConfig config) {
+ this.basePath = metaClient.getBasePathV2().toString();
+ this.dbName = metaClient.getTableConfig().getDatabaseName();
+ this.tableName = metaClient.getTableConfig().getTableName();
+ this.uri = config.getTableServiceManagerURIs();
+ this.config = config;
+ this.metaClient = metaClient;
+ }
+
+ private String executeRequest(String requestPath, Map<String, String> queryParameters) throws IOException {
+ URIBuilder builder = new URIBuilder(URI.create(uri)).setPath(requestPath);
+ queryParameters.forEach(builder::addParameter);
+
+ String url = builder.toString();
+ LOG.info("Sending request to table management service : (" + url + ")");
+ int timeoutMs = this.config.getConnectionTimeoutSec() * 1000;
+ int requestRetryLimit = config.getConnectionRetryLimit();
+ int connectionRetryDelay = config.getConnectionRetryDelay();
+
+ RetryHelper<String, IOException> retryHelper = new RetryHelper<>(connectionRetryDelay, requestRetryLimit, connectionRetryDelay, RETRY_EXCEPTIONS);
+ return retryHelper.tryWith(() -> Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute().returnContent().asString()).start();
+ }
+
+ private Map<String, String> getParamsWithAdditionalParams(String[] paramNames, String[] paramVals) {
+ Map<String, String> paramsMap = new HashMap<>();
+ paramsMap.put(BASEPATH_PARAM, basePath);
+ ValidationUtils.checkArgument(paramNames.length == paramVals.length);
+ for (int i = 0; i < paramNames.length; i++) {
+ paramsMap.put(paramNames[i], paramVals[i]);
+ }
+ return paramsMap;
+ }
+
+ public Option<String> executeCompaction() {
+ try {
+ String instantRange = StringUtils.join(metaClient.reloadActiveTimeline()
+ .filterPendingCompactionTimeline()
+ .getInstantsAsStream()
+ .map(HoodieInstant::getTimestamp)
+ .toArray(String[]::new), ",");
+
+ executeRequest(EXECUTE_COMPACTION, getDefaultParams(Action.REQUEST, instantRange));
+ return Option.of(instantRange);
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
+ public Option<String> executeClean() {
+ try {
+ String instantRange = StringUtils.join(metaClient.reloadActiveTimeline()
+ .getCleanerTimeline()
+ .filterInflightsAndRequested()
+ .getInstantsAsStream()
+ .map(HoodieInstant::getTimestamp)
+ .toArray(String[]::new), ",");
+
+ executeRequest(EXECUTE_CLEAN, getDefaultParams(Action.REQUEST, instantRange));
+ return Option.of(instantRange);
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
+ public Option<String> executeClustering() {
+ try {
+ metaClient.reloadActiveTimeline();
+ String instantRange = StringUtils.join(ClusteringUtils.getPendingClusteringInstantTimes(metaClient)
+ .stream()
+ .map(HoodieInstant::getTimestamp)
+ .toArray(String[]::new), ",");
+
+ executeRequest(EXECUTE_CLUSTERING, getDefaultParams(Action.REQUEST, instantRange));
+ return Option.of(instantRange);
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
+ private Map<String, String> getDefaultParams(Action action, String instantRange) {
+ return getParamsWithAdditionalParams(
+ new String[] {ACTION, DATABASE_NAME_PARAM, TABLE_NAME_PARAM, INSTANT_PARAM, USERNAME, QUEUE, RESOURCE, PARALLELISM, EXTRA_PARAMS, EXECUTION_ENGINE},
+ new String[] {action.name(), dbName, tableName, instantRange, config.getDeployUsername(), config.getDeployQueue(), config.getDeployResources(),
+ String.valueOf(config.getDeployParallelism()), config.getDeployExtraParams(), config.getDeployExecutionEngine()});
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java
index 64e540568e8..7de48be975a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client;
+import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.log4j.LogManager;
@@ -34,4 +35,8 @@ public interface RunsTableService {
}
return enabled;
}
+
+ default boolean shouldDelegateToTableServiceManager(HoodieWriteConfig config, ActionType actionType) {
+ return config.getTableServiceManagerConfig().isEnabledAndActionSupported(actionType);
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index b70b13c0833..879136206e2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieMetaserverConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.HoodieTableServiceManagerConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
@@ -558,7 +559,8 @@ public class HoodieWriteConfig extends HoodieConfig {
private FileSystemViewStorageConfig viewStorageConfig;
private HoodiePayloadConfig hoodiePayloadConfig;
private HoodieMetadataConfig metadataConfig;
- private HoodieMetaserverConfig metaserverConfig;
+ private HoodieMetaserverConfig metastoreConfig;
+ private HoodieTableServiceManagerConfig tableServiceManagerConfig;
private HoodieCommonConfig commonConfig;
private HoodieStorageConfig storageConfig;
private EngineType engineType;
@@ -951,7 +953,8 @@ public class HoodieWriteConfig extends HoodieConfig {
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build();
- this.metaserverConfig = HoodieMetaserverConfig.newBuilder().fromProperties(props).build();
+ this.metastoreConfig = HoodieMetaserverConfig.newBuilder().fromProperties(props).build();
+ this.tableServiceManagerConfig = HoodieTableServiceManagerConfig.newBuilder().fromProperties(props).build();
this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build();
this.storageConfig = HoodieStorageConfig.newBuilder().fromProperties(props).build();
}
@@ -2062,6 +2065,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return metadataConfig;
}
+ public HoodieTableServiceManagerConfig getTableServiceManagerConfig() {
+ return tableServiceManagerConfig;
+ }
+
public HoodieCommonConfig getCommonConfig() {
return commonConfig;
}
@@ -2275,7 +2282,7 @@ public class HoodieWriteConfig extends HoodieConfig {
* Metastore configs.
*/
public boolean isMetaserverEnabled() {
- return metaserverConfig.isMetaserverEnabled();
+ return metastoreConfig.isMetaserverEnabled();
}
/**
@@ -2286,6 +2293,13 @@ public class HoodieWriteConfig extends HoodieConfig {
getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
}
+ /**
+ * Table Service Manager configs.
+ */
+ public boolean isTableServiceManagerEnabled() {
+ return tableServiceManagerConfig.isTableServiceManagerEnabled();
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 1f6a5a1d790..432c6dc80d3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -37,6 +37,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -45,6 +46,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.util.MapUtils.nonEmpty;
+
public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCleanerPlan>> {
private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
@@ -145,8 +148,8 @@ public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I
*/
protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
final HoodieCleanerPlan cleanerPlan = requestClean(context);
- if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null)
- && !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()
+ Option<HoodieCleanerPlan> option = Option.empty();
+ if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
&& cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
// Only create cleaner plan which does some work
final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
@@ -158,9 +161,10 @@ public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I
LOG.error("Got exception when saving cleaner requested file", e);
throw new HoodieIOException(e.getMessage(), e);
}
- return Option.of(cleanerPlan);
+ option = Option.of(cleanerPlan);
}
- return Option.empty();
+
+ return option;
}
@Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 683be09efee..2445043e07c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -104,6 +104,7 @@ public class ClusteringPlanActionExecutor<T, I, K, O> extends BaseActionExecutor
throw new HoodieIOException("Exception scheduling clustering", ioe);
}
}
+
return planOption;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 328ed7d9221..7444c6eef1b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -36,19 +36,22 @@ import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
-
import org.apache.hudi.table.action.compact.plan.generators.BaseHoodieCompactionPlanGenerator;
import org.apache.hudi.table.action.compact.plan.generators.HoodieCompactionPlanGenerator;
import org.apache.hudi.table.action.compact.plan.generators.HoodieLogCompactionPlanGenerator;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.util.CollectionUtils.nonEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
public class ScheduleCompactionActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCompactionPlan>> {
@@ -108,7 +111,8 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> extends BaseActionExec
}
HoodieCompactionPlan plan = scheduleCompaction();
- if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) {
+ Option<HoodieCompactionPlan> option = Option.empty();
+ if (plan != null && nonEmpty(plan.getOperations())) {
extraMetadata.ifPresent(plan::setExtraMetadata);
try {
if (operationType.equals(WriteOperationType.COMPACT)) {
@@ -125,11 +129,13 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> extends BaseActionExec
} catch (IOException ioe) {
throw new HoodieIOException("Exception scheduling compaction", ioe);
}
- return Option.of(plan);
+ option = Option.of(plan);
}
- return Option.empty();
+
+ return option;
}
+ @Nullable
private HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
// judge if we need to compact according to num delta commits and time elapsed
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index 5e5d6de92d3..fe0d1879400 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -42,6 +42,8 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
@@ -65,6 +67,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T extends HoodieRecordPa
this.engineContext = engineContext;
}
+ @Nullable
public HoodieCompactionPlan generateCompactionPlan() throws IOException {
// Accumulator to keep track of total log files for a table
HoodieAccumulator totalLogFiles = engineContext.newAccumulator();
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
new file mode 100644
index 00000000000..4b3eaaa1d42
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.compact.CompactHelpers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hudi.util.FlinkClientUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HoodieFlinkTableServiceClient<T> extends BaseHoodieTableServiceClient<List<WriteStatus>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkTableServiceClient.class);
+
+ /**
+ * Cached metadata writer for coordinator to reuse for each commit.
+ */
+ private HoodieBackedTableMetadataWriter metadataWriter;
+
+ protected HoodieFlinkTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+ super(context, clientConfig);
+ }
+
+ @Override
+ protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
+ // only used for metadata table, the compaction happens in single thread
+ HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime);
+ commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
+ return compactionMetadata;
+ }
+
+ @Override
+ public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+ extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+ completeCompaction(metadata, getHoodieTable(), compactionInstantTime);
+ }
+
+ @Override
+ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) {
+ this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
+ List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+ final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
+ try {
+ this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
+ finalizeWrite(table, compactionCommitTime, writeStats);
+ // commit to data table after committing to metadata table.
+ // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
+ // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
+ writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata);
+ LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
+ CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
+ } finally {
+ this.txnManager.endTransaction(Option.of(compactionInstant));
+ }
+ WriteMarkersFactory
+ .get(config.getMarkersType(), table, compactionCommitTime)
+ .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+ if (compactionTimer != null) {
+ long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+ try {
+ metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
+ durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
+ } catch (ParseException e) {
+ throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ + config.getBasePath() + " at time " + compactionCommitTime, e);
+ }
+ }
+ LOG.info("Compacted successfully on commit " + compactionCommitTime);
+ }
+
+ protected void completeClustering(
+ HoodieReplaceCommitMetadata metadata,
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+ String clusteringCommitTime) {
+ this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering");
+ HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
+ List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
+ e.getValue().stream()).collect(Collectors.toList());
+ if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
+ throw new HoodieClusteringException("Clustering failed to write to files:"
+ + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
+ }
+
+ try {
+ this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
+ finalizeWrite(table, clusteringCommitTime, writeStats);
+ // commit to data table after committing to metadata table.
+ // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
+ // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
+ writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata);
+ LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
+ table.getActiveTimeline().transitionReplaceInflightToComplete(
+ HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
+ Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ } catch (IOException e) {
+ throw new HoodieClusteringException(
+ "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e);
+ } finally {
+ this.txnManager.endTransaction(Option.of(clusteringInstant));
+ }
+
+ WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
+ .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+ if (clusteringTimer != null) {
+ long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
+ try {
+ metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
+ durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+ } catch (ParseException e) {
+ throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ + config.getBasePath() + " at time " + clusteringCommitTime, e);
+ }
+ }
+ LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
+ return null;
+ }
+
+ @Override
+ protected HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
+ return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+ }
+
+ public HoodieFlinkTable<?> getHoodieTable() {
+ return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+ }
+
+ @Override
+ public void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
+ try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) {
+ metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime));
+ } catch (Exception e) {
+ throw new HoodieException("Failed to update metadata", e);
+ }
+ }
+
+ /**
+ * Initialize the table metadata writer, for e.g, bootstrap the metadata table
+ * from the filesystem if it does not exist.
+ */
+ private HoodieBackedTableMetadataWriter initMetadataWriter() {
+ return (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
+ FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
+ }
+
+ public void initMetadataTable() {
+ HoodieFlinkTable<?> table = getHoodieTable();
+ if (config.isMetadataTableEnabled()) {
+ // initialize the metadata table path
+ // guard the metadata writer with concurrent lock
+ try {
+ this.txnManager.getLockManager().lock();
+ initMetadataWriter().close();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to initialize metadata table", e);
+ } finally {
+ this.txnManager.getLockManager().unlock();
+ }
+ // clean the obsolete index stats
+ table.deleteMetadataIndexIfNecessary();
+ } else {
+ // delete the metadata table if it was enabled but is now disabled
+ table.maybeDeleteMetadataTable();
+ }
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 5bf948de368..665950c036c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -18,7 +18,6 @@
package org.apache.hudi.client;
-import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -33,31 +32,21 @@ import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieClusteringException;
-import org.apache.hudi.exception.HoodieCommitException;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FlinkWriteHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
-import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
-import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
-import org.apache.hudi.util.FlinkClientUtil;
import org.apache.hudi.util.WriteStatMerger;
import com.codahale.metrics.Timer;
@@ -65,9 +54,6 @@ import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.text.ParseException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -99,6 +85,7 @@ public class HoodieFlinkWriteClient<T> extends
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance());
this.bucketToHandles = new HashMap<>();
+ this.tableServiceClient = new HoodieFlinkTableServiceClient<>(context, writeConfig);
}
/**
@@ -283,44 +270,14 @@ public class HoodieFlinkWriteClient<T> extends
@Override
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
- try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) {
- metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime));
- } catch (Exception e) {
- throw new HoodieException("Failed to update metadata", e);
- }
- }
-
- /**
- * Initialize the table metadata writer, for e.g, bootstrap the metadata table
- * from the filesystem if it does not exist.
- */
- private HoodieBackedTableMetadataWriter initMetadataWriter() {
- return (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
- FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
+ tableServiceClient.writeTableMetadata(table, instantTime, actionType, metadata);
}
/**
* Initialized the metadata table on start up, should only be called once on driver.
*/
public void initMetadataTable() {
- HoodieFlinkTable<?> table = getHoodieTable();
- if (config.isMetadataTableEnabled()) {
- // initialize the metadata table path
- // guard the metadata writer with concurrent lock
- try {
- this.txnManager.getLockManager().lock();
- initMetadataWriter().close();
- } catch (Exception e) {
- throw new HoodieException("Failed to initialize metadata table", e);
- } finally {
- this.txnManager.getLockManager().unlock();
- }
- // clean the obsolete index stats
- table.deleteMetadataIndexIfNecessary();
- } else {
- // delete the metadata table if it was enabled but is now disabled
- table.maybeDeleteMetadataTable();
- }
+ ((HoodieFlinkTableServiceClient<T>) tableServiceClient).initMetadataTable();
}
/**
@@ -331,11 +288,7 @@ public class HoodieFlinkWriteClient<T> extends
* checkpoint finish.
*/
public void startAsyncCleaning() {
- if (this.asyncCleanerService == null) {
- this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
- } else {
- this.asyncCleanerService.start(null);
- }
+ tableServiceClient.startAsyncCleanerService(this);
}
/**
@@ -346,9 +299,9 @@ public class HoodieFlinkWriteClient<T> extends
* checkpoint finish.
*/
public void waitForCleaningFinish() {
- if (this.asyncCleanerService != null) {
+ if (tableServiceClient.asyncCleanerService != null) {
LOG.info("Cleaner has been spawned already. Waiting for it to finish");
- AsyncCleanerService.waitForCompletion(asyncCleanerService);
+ tableServiceClient.asyncClean();
LOG.info("Cleaner has finished");
}
}
@@ -396,9 +349,7 @@ public class HoodieFlinkWriteClient<T> extends
String compactionInstantTime,
HoodieCommitMetadata metadata,
Option<Map<String, String>> extraMetadata) {
- HoodieFlinkTable<T> table = getHoodieTable();
- extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
- completeCompaction(metadata, table, compactionInstantTime);
+ tableServiceClient.commitCompaction(compactionInstantTime, metadata, extraMetadata);
}
@Override
@@ -406,43 +357,13 @@ public class HoodieFlinkWriteClient<T> extends
HoodieCommitMetadata metadata,
HoodieTable table,
String compactionCommitTime) {
- this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
- List<HoodieWriteStat> writeStats = metadata.getWriteStats();
- final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
- try {
- this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
- finalizeWrite(table, compactionCommitTime, writeStats);
- // commit to data table after committing to metadata table.
- // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
- // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
- writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata);
- LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
- CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
- } finally {
- this.txnManager.endTransaction(Option.of(compactionInstant));
- }
- WriteMarkersFactory
- .get(config.getMarkersType(), table, compactionCommitTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- if (compactionTimer != null) {
- long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
- try {
- metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
- durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
- } catch (ParseException e) {
- throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
- + config.getBasePath() + " at time " + compactionCommitTime, e);
- }
- }
- LOG.info("Compacted successfully on commit " + compactionCommitTime);
+ tableServiceClient.completeCompaction(metadata, table, compactionCommitTime);
}
@Override
protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
// only used for metadata table, the compaction happens in single thread
- HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime);
- commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
- return compactionMetadata;
+ return tableServiceClient.compact(compactionInstantTime, shouldComplete);
}
@Override
@@ -454,46 +375,7 @@ public class HoodieFlinkWriteClient<T> extends
HoodieReplaceCommitMetadata metadata,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String clusteringCommitTime) {
- this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering");
- HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
- List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
- e.getValue().stream()).collect(Collectors.toList());
- if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
- throw new HoodieClusteringException("Clustering failed to write to files:"
- + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
- }
-
- try {
- this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
- finalizeWrite(table, clusteringCommitTime, writeStats);
- // commit to data table after committing to metadata table.
- // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
- // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
- writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata);
- LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
- table.getActiveTimeline().transitionReplaceInflightToComplete(
- HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
- Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
- } catch (IOException e) {
- throw new HoodieClusteringException(
- "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e);
- } finally {
- this.txnManager.endTransaction(Option.of(clusteringInstant));
- }
-
- WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- if (clusteringTimer != null) {
- long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
- try {
- metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
- durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
- } catch (ParseException e) {
- throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
- + config.getBasePath() + " at time " + clusteringCommitTime, e);
- }
- }
- LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+ ((HoodieFlinkTableServiceClient<T>) tableServiceClient).completeClustering(metadata, table, clusteringCommitTime);
}
@Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java
new file mode 100644
index 00000000000..2d823aa7f57
--- /dev/null
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.table.HoodieJavaTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+import java.util.Map;
+
+public class HoodieJavaTableServiceClient extends BaseHoodieTableServiceClient<List<WriteStatus>> {
+
+ protected HoodieJavaTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+ super(context, clientConfig);
+ }
+
+ @Override
+ protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
+ throw new HoodieNotSupportedException("Compact is not supported in HoodieJavaTableServiceClient");
+ }
+
+ @Override
+ public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+ throw new HoodieNotSupportedException("CommitCompaction is not supported in HoodieJavaTableServiceClient");
+ }
+
+ @Override
+ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) {
+ throw new HoodieNotSupportedException("CompleteCompaction is not supported in HoodieJavaTableServiceClient");
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
+ throw new HoodieNotSupportedException("Cluster is not supported in HoodieJavaTableServiceClient");
+ }
+
+ @Override
+ protected HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
+ return HoodieJavaTable.create(config, context);
+ }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index ed1b400a757..0f7f48194cd 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -51,8 +51,9 @@ import java.util.stream.Collectors;
public class HoodieJavaWriteClient<T> extends
BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
- public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
- super(context, clientConfig, JavaUpgradeDowngradeHelper.getInstance());
+ public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
+ super(context, writeConfig, JavaUpgradeDowngradeHelper.getInstance());
+ this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig);
}
public HoodieJavaWriteClient(HoodieEngineContext context,
@@ -60,6 +61,7 @@ public class HoodieJavaWriteClient<T> extends
boolean rollbackPending,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService, JavaUpgradeDowngradeHelper.getInstance());
+ this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig);
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
index 73037df40cb..888dda3ef2c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
@@ -50,7 +50,7 @@ public class HoodieSparkClusteringClient<T> extends
public void cluster(HoodieInstant instant) throws IOException {
LOG.info("Executing clustering instance " + instant);
SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>) clusteringClient;
- Option<HoodieCommitMetadata> commitMetadata = writeClient.cluster(instant.getTimestamp(), true).getCommitMetadata();
+ Option<HoodieCommitMetadata> commitMetadata = writeClient.cluster(instant.getTimestamp()).getCommitMetadata();
Stream<HoodieWriteStat> hoodieWriteStatStream = commitMetadata.get().getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream());
long errorsCount = hoodieWriteStatStream.mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
new file mode 100644
index 00000000000..1fcd6acb12d
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.compact.CompactHelpers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SparkRDDTableServiceClient<T> extends BaseHoodieTableServiceClient<JavaRDD<WriteStatus>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkRDDTableServiceClient.class);
+
+ protected SparkRDDTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+ super(context, clientConfig);
+ }
+
+ @Override
+ protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
+ HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+ HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
+ HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
+ if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
+ table.rollbackInflightCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+ table.getMetaClient().reloadActiveTimeline();
+ }
+ compactionTimer = metrics.getCompactionCtx();
+ HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.compact(context, compactionInstantTime);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
+ if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
+ completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime);
+ }
+ return compactionMetadata;
+ }
+
+ @Override
+ protected HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompact(String logCompactionInstantTime, boolean shouldComplete) {
+ HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+ HoodieTimeline pendingLogCompactionTimeline = table.getActiveTimeline().filterPendingLogCompactionTimeline();
+ HoodieInstant inflightInstant = HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime);
+ if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
+ LOG.info("Found Log compaction inflight file. Rolling back the commit and exiting.");
+ table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+ table.getMetaClient().reloadActiveTimeline();
+ throw new HoodieException("Inflight logcompaction file exists");
+ }
+ logCompactionTimer = metrics.getLogCompactionCtx();
+ WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionInstantTime);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.logCompact(context, logCompactionInstantTime);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
+ if (shouldComplete && logCompactionMetadata.getCommitMetadata().isPresent()) {
+ completeTableService(TableServiceType.LOG_COMPACT, logCompactionMetadata.getCommitMetadata().get(), table, logCompactionInstantTime);
+ }
+ return logCompactionMetadata;
+ }
+
+ @Override
+ public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+ HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+ extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+ completeCompaction(metadata, table, compactionInstantTime);
+ }
+
+ @Override
+ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) {
+ this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
+ List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+ final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
+ try {
+ this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
+ finalizeWrite(table, compactionCommitTime, writeStats);
+ // commit to data table after committing to metadata table.
+ updateTableMetadata(table, metadata, compactionInstant);
+ LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
+ CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
+ } finally {
+ this.txnManager.endTransaction(Option.of(compactionInstant));
+ }
+ WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
+ .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+ if (compactionTimer != null) {
+ long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+ HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant ->
+ metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION)
+ );
+ }
+ LOG.info("Compacted successfully on commit " + compactionCommitTime);
+ }
+
+ @Override
+ protected void completeLogCompaction(HoodieCommitMetadata metadata,
+ HoodieTable table,
+ String logCompactionCommitTime) {
+ this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log compaction write status and commit compaction");
+ List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+ final HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
+ try {
+ this.txnManager.beginTransaction(Option.of(logCompactionInstant), Option.empty());
+ preCommit(metadata);
+ finalizeWrite(table, logCompactionCommitTime, writeStats);
+ // commit to data table after committing to metadata table.
+ updateTableMetadata(table, metadata, logCompactionInstant);
+ LOG.info("Committing Log Compaction " + logCompactionCommitTime + ". Finished with result " + metadata);
+ CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
+ } finally {
+ this.txnManager.endTransaction(Option.of(logCompactionInstant));
+ }
+ WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionCommitTime)
+ .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+ if (compactionTimer != null) {
+ long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+ HoodieActiveTimeline.parseDateFromInstantTimeSafely(logCompactionCommitTime).ifPresent(parsedInstant ->
+ metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.LOG_COMPACTION_ACTION)
+ );
+ }
+ LOG.info("Log Compacted successfully on commit " + logCompactionCommitTime);
+ }
+
+ @Override
+ public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
+ HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+ HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
+ HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
+ if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
+ table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+ table.getMetaClient().reloadActiveTimeline();
+ }
+ clusteringTimer = metrics.getClusteringCtx();
+ LOG.info("Starting clustering at " + clusteringInstant);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.cluster(context, clusteringInstant);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
+ // Validation has to be done after cloning. if not, it could result in dereferencing the write status twice which means clustering could get executed twice.
+ validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
+ // TODO : Where is shouldComplete used ?
+ if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
+ completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
+ }
+ return clusteringMetadata;
+ }
+
+ // TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy
+ private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata,
+ HoodieTable table,
+ String commitInstant) {
+
+ switch (tableServiceType) {
+ case CLUSTER:
+ completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant);
+ break;
+ case COMPACT:
+ completeCompaction(metadata, table, commitInstant);
+ break;
+ case LOG_COMPACT:
+ completeLogCompaction(metadata, table, commitInstant);
+ break;
+ default:
+ throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
+ }
+ }
+
+ private void completeClustering(HoodieReplaceCommitMetadata metadata,
+ HoodieTable table,
+ String clusteringCommitTime) {
+ List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
+ e.getValue().stream()).collect(Collectors.toList());
+
+ if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
+ throw new HoodieClusteringException("Clustering failed to write to files:"
+ + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
+ }
+
+ final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
+ try {
+ this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
+
+ finalizeWrite(table, clusteringCommitTime, writeStats);
+ // Update table's metadata (table)
+ updateTableMetadata(table, metadata, clusteringInstant);
+
+ LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);
+
+ table.getActiveTimeline().transitionReplaceInflightToComplete(
+ clusteringInstant,
+ Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ } catch (Exception e) {
+ throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
+ } finally {
+ this.txnManager.endTransaction(Option.of(clusteringInstant));
+ }
+ WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
+ .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+ if (clusteringTimer != null) {
+ long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
+ HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant ->
+ metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
+ );
+ }
+ LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+ }
+
+ private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
+ if (clusteringMetadata.getWriteStatuses().isEmpty()) {
+ HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(
+ table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringCommitTime))
+ .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
+ "Unable to read clustering plan for instant: " + clusteringCommitTime));
+ throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringCommitTime
+ + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
+ + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ + " write statuses");
+ }
+ }
+
+ private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
+ HoodieInstant hoodieInstant) {
+ boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction(), hoodieInstant.getTimestamp());
+ // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
+ // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
+ table.getMetadataWriter(hoodieInstant.getTimestamp())
+ .ifPresent(writer -> ((HoodieTableMetadataWriter) writer).update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
+ }
+
+ /**
+ * Initialize the metadata table if needed. Creating the metadata table writer
+ * will trigger the initial bootstrapping from the data table.
+ *
+ * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization
+ */
+ protected void initializeMetadataTable(Option<String> inFlightInstantTimestamp) {
+ if (config.isMetadataTableEnabled()) {
+ SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
+ context, Option.empty(), inFlightInstantTimestamp);
+ }
+ }
+
+ @Override
+ protected HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
+ return HoodieSparkTable.create(config, context);
+ }
+
+ @Override
+ protected void preCommit(HoodieCommitMetadata metadata) {
+ // Create a Hoodie table after startTxn which encapsulated the commits and files visible.
+ // Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
+ HoodieTable table = createTable(config, hadoopConf);
+ resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants);
+ }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 4ddcd13e4f4..5c094b8d5be 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -18,11 +18,8 @@
package org.apache.hudi.client;
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
-import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
@@ -30,22 +27,15 @@ import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
-import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
@@ -55,8 +45,6 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
-import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import com.codahale.metrics.Timer;
@@ -66,11 +54,9 @@ import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength")
public class SparkRDDWriteClient<T> extends
@@ -96,6 +82,7 @@ public class SparkRDDWriteClient<T> extends
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService, SparkUpgradeDowngradeHelper.getInstance());
+ this.tableServiceClient = new SparkRDDTableServiceClient<>(context, writeConfig);
}
@Override
@@ -289,56 +276,21 @@ public class SparkRDDWriteClient<T> extends
@Override
public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
- HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
- extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
- completeCompaction(metadata, table, compactionInstantTime);
+ tableServiceClient.commitCompaction(compactionInstantTime, metadata, extraMetadata);
}
@Override
protected void completeCompaction(HoodieCommitMetadata metadata,
HoodieTable table,
String compactionCommitTime) {
- this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
- List<HoodieWriteStat> writeStats = metadata.getWriteStats();
- final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
- try {
- this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
- finalizeWrite(table, compactionCommitTime, writeStats);
- // commit to data table after committing to metadata table.
- updateTableMetadata(table, metadata, compactionInstant);
- LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
- CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
- } finally {
- this.txnManager.endTransaction(Option.of(compactionInstant));
- }
- WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- if (compactionTimer != null) {
- long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
- HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant ->
- metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION)
- );
- }
- LOG.info("Compacted successfully on commit " + compactionCommitTime);
+ tableServiceClient.completeCompaction(metadata, table, compactionCommitTime);
}
@Override
protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient());
- HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
- HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
- if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
- table.getMetaClient().reloadActiveTimeline();
- }
- compactionTimer = metrics.getCompactionCtx();
- HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.compact(context, compactionInstantTime);
- HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
- if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
- completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime);
- }
- return compactionMetadata;
+ return tableServiceClient.compact(compactionInstantTime, shouldComplete);
}
@Override
@@ -352,136 +304,21 @@ public class SparkRDDWriteClient<T> extends
protected void completeLogCompaction(HoodieCommitMetadata metadata,
HoodieTable table,
String logCompactionCommitTime) {
- this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log compaction write status and commit compaction");
- List<HoodieWriteStat> writeStats = metadata.getWriteStats();
- final HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
- try {
- this.txnManager.beginTransaction(Option.of(logCompactionInstant), Option.empty());
- preCommit(logCompactionInstant, metadata);
- finalizeWrite(table, logCompactionCommitTime, writeStats);
- // commit to data table after committing to metadata table.
- updateTableMetadata(table, metadata, logCompactionInstant);
- LOG.info("Committing Log Compaction " + logCompactionCommitTime + ". Finished with result " + metadata);
- CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
- } finally {
- this.txnManager.endTransaction(Option.of(logCompactionInstant));
- }
- WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionCommitTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- if (compactionTimer != null) {
- long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
- HoodieActiveTimeline.parseDateFromInstantTimeSafely(logCompactionCommitTime).ifPresent(parsedInstant ->
- metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.LOG_COMPACTION_ACTION)
- );
- }
- LOG.info("Log Compacted successfully on commit " + logCompactionCommitTime);
+ tableServiceClient.completeLogCompaction(metadata, table, logCompactionCommitTime);
}
@Override
protected HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompact(String logCompactionInstantTime, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
preWrite(logCompactionInstantTime, WriteOperationType.LOG_COMPACT, table.getMetaClient());
- HoodieTimeline pendingLogCompactionTimeline = table.getActiveTimeline().filterPendingLogCompactionTimeline();
- HoodieInstant inflightInstant = HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime);
- if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
- LOG.info("Found Log compaction inflight file. Rolling back the commit and exiting.");
- table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
- table.getMetaClient().reloadActiveTimeline();
- throw new HoodieException("Inflight logcompaction file exists");
- }
- logCompactionTimer = metrics.getLogCompactionCtx();
- WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionInstantTime);
- HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.logCompact(context, logCompactionInstantTime);
- HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
- if (shouldComplete && logCompactionMetadata.getCommitMetadata().isPresent()) {
- completeTableService(TableServiceType.LOG_COMPACT, logCompactionMetadata.getCommitMetadata().get(), table, logCompactionInstantTime);
- }
- return logCompactionMetadata;
+ return tableServiceClient.logCompact(logCompactionInstantTime, shouldComplete);
}
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
- HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
- HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
- if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
- table.getMetaClient().reloadActiveTimeline();
- }
- clusteringTimer = metrics.getClusteringCtx();
- LOG.info("Starting clustering at " + clusteringInstant);
- HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.cluster(context, clusteringInstant);
- HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
- // Validation has to be done after cloning. if not, it could result in dereferencing the write status twice which means clustering could get executed twice.
- validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
- // TODO : Where is shouldComplete used ?
- if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
- completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
- }
- return clusteringMetadata;
- }
-
- private void completeClustering(HoodieReplaceCommitMetadata metadata,
- HoodieTable table,
- String clusteringCommitTime) {
- List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
- e.getValue().stream()).collect(Collectors.toList());
-
- if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
- throw new HoodieClusteringException("Clustering failed to write to files:"
- + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
- }
-
- final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
- try {
- this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
-
- finalizeWrite(table, clusteringCommitTime, writeStats);
- // Update table's metadata (table)
- updateTableMetadata(table, metadata, clusteringInstant);
-
- LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);
-
- table.getActiveTimeline().transitionReplaceInflightToComplete(
- clusteringInstant,
- Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
- } catch (Exception e) {
- throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
- } finally {
- this.txnManager.endTransaction(Option.of(clusteringInstant));
- }
- WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- if (clusteringTimer != null) {
- long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
- HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant ->
- metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
- );
- }
- LOG.info("Clustering successfully on commit " + clusteringCommitTime);
- }
-
- private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
- if (clusteringMetadata.getWriteStatuses().isEmpty()) {
- HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(
- table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringCommitTime))
- .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
- "Unable to read clustering plan for instant: " + clusteringCommitTime));
- throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringCommitTime
- + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
- + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
- + " write statuses");
- }
- }
-
- private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
- HoodieInstant hoodieInstant) {
- boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction(), hoodieInstant.getTimestamp());
- // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
- // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
- table.getMetadataWriter(hoodieInstant.getTimestamp())
- .ifPresent(writer -> ((HoodieTableMetadataWriter) writer).update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
+ return tableServiceClient.cluster(clusteringInstant, shouldComplete);
}
@Override
@@ -510,44 +347,12 @@ public class SparkRDDWriteClient<T> extends
}
}
- // TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy
- private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata,
- HoodieTable table,
- String commitInstant) {
-
- switch (tableServiceType) {
- case CLUSTER:
- completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant);
- break;
- case COMPACT:
- completeCompaction(metadata, table, commitInstant);
- break;
- case LOG_COMPACT:
- completeLogCompaction(metadata, table, commitInstant);
- break;
- default:
- throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
- }
- }
-
@Override
protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
// Create a Hoodie table after startTxn which encapsulated the commits and files visible.
// Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
HoodieTable table = createTable(config, hadoopConf);
- Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx();
- try {
- TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
- Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, this.pendingInflightAndRequestedInstants);
- metrics.emitConflictResolutionSuccessful();
- } catch (HoodieWriteConflictException e) {
- metrics.emitConflictResolutionFailed();
- throw e;
- } finally {
- if (conflictResolutionTimer != null) {
- conflictResolutionTimer.stop();
- }
- }
+ resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants);
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index d226b5b995a..3a5e940d460 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -573,7 +573,7 @@ public class TestClientRollback extends HoodieClientTestBase {
// the compaction instants should be excluded
metaClient.reloadActiveTimeline();
- assertEquals(0, client.getPendingRollbackInfos(metaClient).size());
+ assertEquals(0, client.getTableServiceClient().getPendingRollbackInfos(metaClient).size());
// verify there is no extra rollback instants
client.rollback(commitTime4);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index ba32aea0b34..38033b9af35 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -2812,8 +2812,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
@Override
- protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
- throw new HoodieException(CLUSTERING_FAILURE);
+ protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+ if (config.inlineClusteringEnabled()) {
+ throw new HoodieException(CLUSTERING_FAILURE);
+ }
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableServiceManagerConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableServiceManagerConfig.java
new file mode 100644
index 00000000000..fa0c8c17251
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableServiceManagerConfig.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import org.apache.hudi.common.model.ActionType;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Properties;
+
+/**
+ * Configurations used by the Hudi Table Service Manager.
+ */
+@Immutable
+@ConfigClassProperty(name = "Table Service Manager Configs",
+ groupName = ConfigGroups.Names.WRITE_CLIENT,
+ description = "Configurations used by the Hudi Table Service Manager.")
+public class HoodieTableServiceManagerConfig extends HoodieConfig {
+
+ public static final String TABLE_SERVICE_MANAGER_PREFIX = "hoodie.table.service.manager";
+
+ public static final ConfigProperty<Boolean> TABLE_SERVICE_MANAGER_ENABLED = ConfigProperty
+ .key(TABLE_SERVICE_MANAGER_PREFIX + ".enabled")
+ .defaultValue(false)
+ .withDocumentation("If true, use table service manager to execute table service");
+
+ public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_URIS = ConfigProperty
+ .key(TABLE_SERVICE_MANAGER_PREFIX + ".uris")
+ .defaultValue("http://localhost:9091")
+ .withDocumentation("Table service manager URIs (comma-delimited).");
+
+ public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_ACTIONS = ConfigProperty
+ .key(TABLE_SERVICE_MANAGER_PREFIX + ".actions")
+ .noDefaultValue()
+ .withDocumentation("The actions deployed on table service manager, such as compaction or clean.");
+
+ public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_DEPLOY_USERNAME = ConfigProperty
+ .key(TABLE_SERVICE_MANAGER_PREFIX + ".deploy.username")
+ .defaultValue("default")
+ .withDocumentation("The user name for this table to deploy table services.");
+
+ public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_DEPLOY_QUEUE = ConfigProperty
+ .key(TABLE_SERVICE_MANAGER_PREFIX + ".deploy.queue")
+ .defaultValue("default")
+ .withDocumentation("The queue for this table to deploy table services.");
+
+ public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_DEPLOY_RESOURCES = ConfigProperty
+ .key(TABLE_SERVICE_MANAGER_PREFIX + ".deploy.resources")
+ .defaultValue("spark:4g,4g")
+ .withDocumentation("The resources for this table to use for deploying table services.");
+
+ public static final ConfigProperty<Integer> TABLE_SERVICE_MANAGER_DEPLOY_PARALLELISM = ConfigProperty
+ .key(TABLE_SERVICE_MANAGER_PREFIX + ".deploy.parallelism")
+ .defaultValue(100)
+ .withDocumentation("The parallelism for this table to deploy table services.");
+
+ public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_DEPLOY_EXECUTION_ENGINE = ConfigProperty
+ .key(TABLE_SERVICE_MANAGER_PREFIX + ".execution.engine")
+ .defaultValue("spark")
+ .withDocumentation("The execution engine to deploy for table service of this table, default spark");
+
+ public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_DEPLOY_EXTRA_PARAMS = ConfigProperty
+ .key(TABLE_SERVICE_MANAGER_PREFIX + ".deploy.extra.params")
+ .noDefaultValue()
+ .withDocumentation("The extra params to deploy for table service of this table, split by ';'");
+
+ public static final ConfigProperty<Integer> TABLE_SERVICE_MANAGER_TIMEOUT_SEC = ConfigProperty
+ .key(TABLE_SERVICE_MANAGER_PREFIX + ".connection.timeout.sec")
+ .defaultValue(300)
+ .withDocumentation("Timeout in seconds for connections to table service manager.");
+
+ public static final ConfigProperty<Integer> TABLE_SERVICE_MANAGER_RETRIES = ConfigProperty
+ .key(TABLE_SERVICE_MANAGER_PREFIX + ".connection.retries")
+ .defaultValue(3)
+ .withDocumentation("Number of retries while opening a connection to table service manager");
+
+ public static final ConfigProperty<Integer> TABLE_SERVICE_MANAGER_RETRY_DELAY_SEC = ConfigProperty
+ .key(TABLE_SERVICE_MANAGER_PREFIX + ".connection.retry.delay.sec")
+ .defaultValue(1)
+ .withDocumentation("Number of seconds for the client to wait between consecutive connection attempts");
+
+ public static HoodieTableServiceManagerConfig.Builder newBuilder() {
+ return new HoodieTableServiceManagerConfig.Builder();
+ }
+
+ public boolean isTableServiceManagerEnabled() {
+ return getBoolean(TABLE_SERVICE_MANAGER_ENABLED);
+ }
+
+ public String getTableServiceManagerURIs() {
+ return getStringOrDefault(TABLE_SERVICE_MANAGER_URIS);
+ }
+
+ public String getTableServiceManagerActions() {
+ return getStringOrDefault(TABLE_SERVICE_MANAGER_ACTIONS);
+ }
+
+ public String getDeployUsername() {
+ return getStringOrDefault(TABLE_SERVICE_MANAGER_DEPLOY_USERNAME);
+ }
+
+ public String getDeployQueue() {
+ return getStringOrDefault(TABLE_SERVICE_MANAGER_DEPLOY_QUEUE);
+ }
+
+ public String getDeployResources() {
+ return getStringOrDefault(TABLE_SERVICE_MANAGER_DEPLOY_RESOURCES);
+ }
+
+ public int getDeployParallelism() {
+ return getIntOrDefault(TABLE_SERVICE_MANAGER_DEPLOY_PARALLELISM);
+ }
+
+ public String getDeployExtraParams() {
+ return getStringOrDefault(TABLE_SERVICE_MANAGER_DEPLOY_EXTRA_PARAMS);
+ }
+
+ public String getDeployExecutionEngine() {
+ return getStringOrDefault(TABLE_SERVICE_MANAGER_DEPLOY_EXECUTION_ENGINE);
+ }
+
+ public int getConnectionTimeoutSec() {
+ return getIntOrDefault(TABLE_SERVICE_MANAGER_TIMEOUT_SEC);
+ }
+
+ public int getConnectionRetryLimit() {
+ return getIntOrDefault(TABLE_SERVICE_MANAGER_RETRIES);
+ }
+
+ public int getConnectionRetryDelay() {
+ return getIntOrDefault(TABLE_SERVICE_MANAGER_RETRY_DELAY_SEC);
+ }
+
+ public boolean isEnabledAndActionSupported(ActionType actionType) {
+ return isTableServiceManagerEnabled() && getTableServiceManagerActions().contains(actionType.name());
+ }
+
+ public static class Builder {
+ private final HoodieTableServiceManagerConfig config = new HoodieTableServiceManagerConfig();
+
+ public Builder fromProperties(Properties props) {
+ this.config.getProps().putAll(props);
+ return this;
+ }
+
+ public Builder setURIs(String uris) {
+ config.setValue(TABLE_SERVICE_MANAGER_URIS, uris);
+ return this;
+ }
+
+ public HoodieTableServiceManagerConfig build() {
+ config.setDefaults(HoodieTableServiceManagerConfig.class.getName());
+ return config;
+ }
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index b451c364186..5acbe55fe8b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -281,7 +281,7 @@ public class HoodieFlinkClusteringJob {
if (table.getMetaClient().getActiveTimeline().containsInstant(inflightInstant)) {
LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]");
table.rollbackInflightClustering(inflightInstant,
- commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+ commitToRollback -> writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
table.getMetaClient().reloadActiveTimeline();
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index 34352ec80bc..b183ba3a4b0 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -78,7 +78,7 @@ public class ClusteringUtil {
inflightInstants.forEach(inflightInstant -> {
LOG.info("Rollback the inflight clustering instant: " + inflightInstant + " for failover");
table.rollbackInflightClustering(inflightInstant,
- commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+ commitToRollback -> writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
table.getMetaClient().reloadActiveTimeline();
});
}
@@ -95,7 +95,7 @@ public class ClusteringUtil {
if (table.getMetaClient().reloadActiveTimeline().filterPendingReplaceTimeline().containsInstant(inflightInstant)) {
LOG.warn("Rollback failed clustering instant: [" + instantTime + "]");
table.rollbackInflightClustering(inflightInstant,
- commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+ commitToRollback -> writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java
deleted file mode 100644
index 20982b5cda6..00000000000
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.functional;
-
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.config.HoodieWriteConfig;
-
-// Sole purpose of this class is to provide access to otherwise API inaccessible from the tests.
-// While it's certainly not a great pattern, it would require substantial test restructuring to
-// eliminate such access to an internal API, so this is considered acceptable given it's very limited
-// scope (w/in the current package)
-class SparkRDDWriteClientOverride extends org.apache.hudi.client.SparkRDDWriteClient {
-
- public SparkRDDWriteClientOverride(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
- super(context, clientConfig);
- }
-
- @Override
- public void rollbackFailedBootstrap() {
- super.rollbackFailedBootstrap();
- }
-}
-
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index 543e01702d3..22d2e934d53 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -20,6 +20,7 @@ package org.apache.hudi.functional;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
@@ -79,7 +80,6 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.AfterEach;
@@ -129,11 +129,10 @@ public class TestBootstrap extends HoodieClientTestBase {
private HoodieParquetRealtimeInputFormat rtInputFormat;
private JobConf rtJobConf;
- private SparkSession spark;
@BeforeEach
public void setUp() throws Exception {
- bootstrapBasePath = tmpFolder.toAbsolutePath().toString() + "/data";
+ bootstrapBasePath = tmpFolder.toAbsolutePath() + "/data";
initPath();
initSparkContexts();
initTestDataGenerator();
@@ -254,7 +253,7 @@ public class TestBootstrap extends HoodieClientTestBase {
.withBootstrapModeSelector(bootstrapModeSelectorClass).build())
.build();
- SparkRDDWriteClientOverride client = new SparkRDDWriteClientOverride(context, config);
+ SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
@@ -263,7 +262,7 @@ public class TestBootstrap extends HoodieClientTestBase {
HoodieActiveTimeline.deleteInstantFile(metaClient.getFs(), metaClient.getMetaPath(), new HoodieInstant(State.COMPLETED,
deltaCommit ? HoodieTimeline.DELTA_COMMIT_ACTION : HoodieTimeline.COMMIT_ACTION, bootstrapCommitInstantTs));
metaClient.reloadActiveTimeline();
- client.rollbackFailedBootstrap();
+ client.getTableServiceClient().rollbackFailedBootstrap();
metaClient.reloadActiveTimeline();
assertEquals(0, metaClient.getCommitsTimeline().countInstants());
assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath, context)
@@ -271,9 +270,10 @@ public class TestBootstrap extends HoodieClientTestBase {
BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
assertFalse(index.useIndex());
+ client.close();
// Run bootstrap again
- client = new SparkRDDWriteClientOverride(context, config);
+ client = new SparkRDDWriteClient(context, config);
client.bootstrap(Option.empty());
metaClient.reloadActiveTimeline();
@@ -307,6 +307,7 @@ public class TestBootstrap extends HoodieClientTestBase {
numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction());
}
+ client.close();
}
@Test
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
index a236755d072..cc9b4cefcb6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
@@ -21,6 +21,7 @@ package org.apache.hudi.functional;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
@@ -50,7 +51,6 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
@@ -74,7 +74,6 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.AfterEach;
@@ -109,9 +108,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
public class TestOrcBootstrap extends HoodieClientTestBase {
-
- public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,double,double,double,double,"
- + "struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
@TempDir
public java.nio.file.Path tmpFolder;
@@ -120,13 +116,9 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
private HoodieParquetInputFormat roInputFormat;
private JobConf roJobConf;
- private HoodieParquetRealtimeInputFormat rtInputFormat;
- private JobConf rtJobConf;
- private SparkSession spark;
-
@BeforeEach
public void setUp() throws Exception {
- bootstrapBasePath = tmpFolder.toAbsolutePath().toString() + "/data";
+ bootstrapBasePath = tmpFolder.toAbsolutePath() + "/data";
initPath();
initSparkContexts();
initTestDataGenerator();
@@ -251,7 +243,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
.withBootstrapModeSelector(bootstrapModeSelectorClass).build())
.build();
- SparkRDDWriteClientOverride client = new SparkRDDWriteClientOverride(context, config);
+ SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
@@ -262,7 +254,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
} else {
FileCreateUtils.deleteCommit(metaClient.getBasePath(), bootstrapCommitInstantTs);
}
- client.rollbackFailedBootstrap();
+ client.getTableServiceClient().rollbackFailedBootstrap();
metaClient.reloadActiveTimeline();
assertEquals(0, metaClient.getCommitsTimeline().countInstants());
assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath, context)
@@ -270,9 +262,10 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
assertFalse(index.useIndex());
+ client.close();
// Run bootstrap again
- client = new SparkRDDWriteClientOverride(context, config);
+ client = new SparkRDDWriteClient(context, config);
client.bootstrap(Option.empty());
metaClient.reloadActiveTimeline();
@@ -306,6 +299,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction());
}
+ client.close();
}
@Test
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index f6905f92d94..3c97b732eb6 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -196,7 +196,7 @@ public class HoodieClusteringJob {
throw new HoodieClusteringException("There is no scheduled clustering in the table.");
}
}
- Option<HoodieCommitMetadata> commitMetadata = client.cluster(cfg.clusteringInstantTime, true).getCommitMetadata();
+ Option<HoodieCommitMetadata> commitMetadata = client.cluster(cfg.clusteringInstantTime).getCommitMetadata();
return UtilHelpers.handleErrors(commitMetadata.get(), cfg.clusteringInstantTime);
}
@@ -252,7 +252,7 @@ public class HoodieClusteringJob {
LOG.info("The schedule instant time is " + instantTime.get());
LOG.info("Step 2: Do cluster");
- Option<HoodieCommitMetadata> metadata = client.cluster(instantTime.get(), true).getCommitMetadata();
+ Option<HoodieCommitMetadata> metadata = client.cluster(instantTime.get()).getCommitMetadata();
return UtilHelpers.handleErrors(metadata.get(), instantTime.get());
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index e57ac513a8e..10dbf998a5f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -386,7 +386,7 @@ public class DeltaSync implements Serializable, Closeable {
if (cfg.retryLastPendingInlineClusteringJob && getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) {
Option<String> pendingClusteringInstant = getLastPendingClusteringInstant(allCommitsTimelineOpt);
if (pendingClusteringInstant.isPresent()) {
- writeClient.cluster(pendingClusteringInstant.get(), true);
+ writeClient.cluster(pendingClusteringInstant.get());
}
}