You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2023/01/05 05:17:53 UTC

[GitHub] [hudi] yuzhaojing commented on a diff in pull request #6732: [HUDI-4148] Add client for hudi table service manager

yuzhaojing commented on code in PR #6732:
URL: https://github.com/apache/hudi/pull/6732#discussion_r1062116644


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.async.AsyncArchiveService;
+import org.apache.hudi.async.AsyncCleanerService;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.common.HoodiePendingRollbackInfo;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient implements RunsTableService {
+
+  private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
+
+  protected transient Timer.Context compactionTimer;
+  protected transient Timer.Context clusteringTimer;
+  protected transient Timer.Context logCompactionTimer;
+
+  protected transient AsyncCleanerService asyncCleanerService;
+  protected transient AsyncArchiveService asyncArchiveService;
+
+  protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+    super(context, clientConfig, Option.empty());
+  }
+
+  protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) {
+    if (this.asyncCleanerService == null) {
+      this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
+    } else {
+      this.asyncCleanerService.start(null);
+    }
+  }
+
+  protected void startAsyncArchiveService(BaseHoodieWriteClient writeClient) {
+    if (this.asyncArchiveService == null) {
+      this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient);
+    } else {
+      this.asyncArchiveService.start(null);
+    }
+  }
+
+  protected void asyncClean() {
+    AsyncCleanerService.waitForCompletion(asyncCleanerService);
+  }
+
+  protected void asyncArchive() {
+    AsyncArchiveService.waitForCompletion(asyncArchiveService);
+  }
+
+  protected void setTableServiceTimer(WriteOperationType operationType) {
+    switch (operationType) {
+      case CLUSTER:
+        clusteringTimer = metrics.getClusteringCtx();
+        break;
+      case COMPACT:
+        compactionTimer = metrics.getCompactionCtx();
+        break;
+      case LOG_COMPACT:
+        logCompactionTimer = metrics.getLogCompactionCtx();
+        break;
+      default:
+    }
+  }
+
+  /**
+   * Performs a compaction operation on a table, serially before or after an insert/upsert action.
+   * Scheduling and execution is done inline.
+   */
+  protected Option<String> inlineCompaction(Option<Map<String, String>> extraMetadata) {
+    Option<String> compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata);
+    compactionInstantTimeOpt.ifPresent(compactInstantTime -> {
+      // inline compaction should auto commit as the user is never given control
+      compact(compactInstantTime, true);
+    });
+    return compactionInstantTimeOpt;
+  }
+
+  private void inlineCompaction(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.compaction)) {
+      scheduleCompaction(extraMetadata);
+    } else {
+      runAnyPendingCompactions(table);
+      inlineCompaction(extraMetadata);
+    }
+  }
+
+  /**
+   * Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time.s
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return Collection of Write Status
+   */
+  protected HoodieWriteMetadata<O> logCompact(String compactionInstantTime, boolean shouldComplete) {
+    throw new UnsupportedOperationException("Log compaction is not supported yet.");
+  }
+
+  /**
+   * Performs a log compaction operation on a table, serially before or after an insert/upsert action.
+   */
+  protected Option<String> inlineLogCompact(Option<Map<String, String>> extraMetadata) {
+    Option<String> logCompactionInstantTimeOpt = scheduleLogCompaction(extraMetadata);
+    logCompactionInstantTimeOpt.ifPresent(logCompactInstantTime -> {
+      // inline log compaction should auto commit as the user is never given control
+      logCompact(logCompactInstantTime, true);
+    });
+    return logCompactionInstantTimeOpt;
+  }
+
+  protected void runAnyPendingCompactions(HoodieTable table) {
+    table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight compaction at instant " + instant);
+          compact(instant.getTimestamp(), true);
+        });
+  }
+
+  protected void runAnyPendingLogCompactions(HoodieTable table) {
+    table.getActiveTimeline().getWriteTimeline().filterPendingLogCompactionTimeline().getInstantsAsStream()
+        .forEach(instant -> {
+          LOG.info("Running previously failed inflight log compaction at instant " + instant);
+          logCompact(instant.getTimestamp(), true);
+        });
+  }
+
+  /***
+   * Schedules compaction inline.
+   * @param extraMetadata extrametada to be used.
+   * @return compaction instant if scheduled.
+   */
+  protected Option<String> inlineScheduleCompaction(Option<Map<String, String>> extraMetadata) {
+    return scheduleCompaction(extraMetadata);
+  }
+
+  /**
+   * Schedules a new compaction instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return Collection of Write Status
+   */
+  protected abstract HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean shouldComplete);
+
+  /**
+   * Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @param metadata              All the metadata that gets stored along with a commit
+   * @param extraMetadata         Extra Metadata to be stored
+   */
+  public abstract void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata,
+                                        Option<Map<String, String>> extraMetadata);
+
+  /**
+   * Commit Compaction and track metrics.
+   */
+  protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime);
+
+  /**
+   * Schedules a new log compaction instant.
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleLogCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleLogCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new log compaction instant with passed-in instant time.
+   * @param instantTime Log Compaction Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleLogCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.LOG_COMPACT).isPresent();
+  }
+
+  /**
+   * Performs Log Compaction for the workload stored in instant-time.
+   *
+   * @param logCompactionInstantTime Log Compaction Instant Time
+   * @return Collection of WriteStatus to inspect errors and counts
+   */
+  public HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime) {
+    return logCompact(logCompactionInstantTime, config.shouldAutoCommit());
+  }
+
+  /**
+   * Commit Log Compaction and track metrics.
+   */
+  protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable table, String logCompactionCommitTime) {
+    throw new UnsupportedOperationException("Log compaction is not supported yet.");
+  }
+
+
+  /**
+   * Schedules a new compaction instant with passed-in instant time.
+   *
+   * @param instantTime   Compaction Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
+  }
+
+  /**
+   * Schedules a new clustering instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public Option<String> scheduleClustering(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleClusteringAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new clustering instant with passed-in instant time.
+   *
+   * @param instantTime   clustering Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  public boolean scheduleClusteringAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLUSTER).isPresent();
+  }
+
+  /**
+   * Schedules a new cleaning instant.
+   *
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  protected Option<String> scheduleCleaning(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleCleaningAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
+  }
+
+  /**
+   * Schedules a new cleaning instant with passed-in instant time.
+   *
+   * @param instantTime   cleaning Instant Time
+   * @param extraMetadata Extra Metadata to be stored
+   */
+  protected boolean scheduleCleaningAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
+    return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLEAN).isPresent();
+  }
+
+  /**
+   * Ensures clustering instant is in expected state and performs clustering for the plan stored in metadata.
+   *
+   * @param clusteringInstant Clustering Instant Time
+   * @return Collection of Write Status
+   */
+  public abstract HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete);
+
+  protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+    if (!tableServicesEnabled(config)) {
+      return;
+    }
+
+    if (!config.areAnyTableServicesExecutedInline() && !config.areAnyTableServicesScheduledInline()) {
+      return;
+    }
+
+    if (config.isMetadataTableEnabled()) {
+      table.getHoodieView().sync();
+    }
+    // Do an inline compaction if enabled
+    if (config.inlineCompactionEnabled()) {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
+      inlineCompaction(table, extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
+    }
+
+    // if just inline schedule is enabled
+    if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction()
+        && table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
+      // proceed only if there are no pending compactions
+      metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
+      inlineScheduleCompaction(extraMetadata);
+    }
+
+    // Do an inline log compaction if enabled
+    if (config.inlineLogCompactionEnabled()) {
+      runAnyPendingLogCompactions(table);
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
+      inlineLogCompact(extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
+    }
+
+    // Do an inline clustering if enabled
+    if (config.inlineClusteringEnabled()) {
+      metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
+      inlineClustering(table, extraMetadata);
+    } else {
+      metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
+    }
+
+    // if just inline schedule is enabled
+    if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering()
+        && table.getActiveTimeline().filterPendingReplaceTimeline().empty()) {
+      // proceed only if there are no pending clustering
+      metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true");
+      inlineScheduleClustering(extraMetadata);
+    }
+  }
+
+  /**
+   * Schedule table services such as clustering, compaction & cleaning.
+   *
+   * @param extraMetadata    Metadata to pass onto the scheduled service instant
+   * @param tableServiceType Type of table service to schedule
+   * @return
+   */
+  public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
+                                             TableServiceType tableServiceType) {
+    // A lock is required to guard against race conditions between an on-going writer and scheduling a table service.
+    final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
+        tableServiceType.getAction(), instantTime));
+    try {
+      this.txnManager.beginTransaction(inflightInstant, Option.empty());
+      LOG.info("Scheduling table service " + tableServiceType);
+      return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
+    } finally {
+      this.txnManager.endTransaction(inflightInstant);
+    }
+  }
+
+  protected Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata,
+                                                        TableServiceType tableServiceType) {
+    if (!tableServicesEnabled(config)) {
+      return Option.empty();
+    }
+
+    Option<String> option = Option.empty();
+    HoodieTable<?, ?, ?, ?> table = createTable(config, hadoopConf);
+
+    switch (tableServiceType) {
+      case ARCHIVE:
+        LOG.info("Scheduling archiving is not supported. Skipping.");
+        break;
+      case CLUSTER:
+        LOG.info("Scheduling clustering at instant time :" + instantTime);
+        Option<HoodieClusteringPlan> clusteringPlan = table
+            .scheduleClustering(context, instantTime, extraMetadata);
+        option = clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case COMPACT:
+        LOG.info("Scheduling compaction at instant time :" + instantTime);
+        Option<HoodieCompactionPlan> compactionPlan = table
+            .scheduleCompaction(context, instantTime, extraMetadata);
+        option = compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case LOG_COMPACT:
+        LOG.info("Scheduling log compaction at instant time :" + instantTime);
+        Option<HoodieCompactionPlan> logCompactionPlan = table
+            .scheduleLogCompaction(context, instantTime, extraMetadata);
+        option = logCompactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      case CLEAN:
+        LOG.info("Scheduling cleaning at instant time :" + instantTime);
+        Option<HoodieCleanerPlan> cleanerPlan = table
+            .scheduleCleaning(context, instantTime, extraMetadata);
+        option = cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+        break;
+      default:
+        throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
+    }
+
+    Option<String> instantRange = delegateToTableServiceManager(tableServiceType, table);
+    if (instantRange.isPresent()) {
+      LOG.info("Delegate instant [" + instantRange.get() + "] to table service manager");
+    }
+
+    return option;
+  }
+
+  protected abstract HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf);
+
+  /**
+   * Executes a clustering plan on a table, serially before or after an insert/upsert action.
+   * Schedules and executes clustering inline.
+   */
+  protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
+    Option<String> clusteringInstantOpt = inlineScheduleClustering(extraMetadata);
+    clusteringInstantOpt.ifPresent(clusteringInstant -> {
+      // inline cluster should auto commit as the user is never given control
+      cluster(clusteringInstant, true);
+    });
+    return clusteringInstantOpt;
+  }
+
+  private void inlineClustering(HoodieTable table, Option<Map<String, String>> extraMetadata) {
+    if (shouldDelegateToTableServiceManager(config, ActionType.replacecommit)) {
+      scheduleClustering(extraMetadata);
+    } else {
+      runAnyPendingClustering(table);
+      inlineClustering(extraMetadata);
+    }
+  }
+
+  /**
+   * Schedules clustering inline.
+   *
+   * @param extraMetadata extrametadata to use.
+   * @return clustering instant if scheduled.
+   */
+  protected Option<String> inlineScheduleClustering(Option<Map<String, String>> extraMetadata) {
+    return scheduleClustering(extraMetadata);
+  }
+
+  protected void runAnyPendingClustering(HoodieTable table) {
+    table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
+      Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
+      if (instantPlan.isPresent()) {
+        LOG.info("Running pending clustering at instant " + instantPlan.get().getLeft());
+        cluster(instant.getTimestamp(), true);
+      }
+    });
+  }
+
+  /**
+   * Finalize Write operation.
+   *
+   * @param table       HoodieTable
+   * @param instantTime Instant Time
+   * @param stats       Hoodie Write Stat
+   */
+  protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {

Review Comment:
   This is a good idea and I will extract it to BaseHoodieClient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org