You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/09/24 22:03:04 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #3590: [HUDI-2285][HUDI-2476] Metadata table synchronous design. Rebased and Squashed from pull/3426

vinothchandar commented on a change in pull request #3590:
URL: https://github.com/apache/hudi/pull/3590#discussion_r715912832



##########
File path: hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
##########
@@ -440,14 +440,6 @@ public String compareCommits(@CliOption(key = {"path"}, help = "Path of the tabl
     }
   }
 
-  @CliCommand(value = "commits sync", help = "Compare commits with another Hoodie table")
-  public String syncCommits(@CliOption(key = {"path"}, help = "Path of the table to compare to") final String path) {
-    HoodieCLI.syncTableMetadata = HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();

Review comment:
       I understand why this is being removed. but we may want some ability to add a new metadata partition in the background, using say `CREATE INDEX` statement, while this is built in the background, there could be new commits , which need to be synced? lets file a subtask to deal with this.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -188,6 +189,7 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
         lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
     try {
       preCommit(instantTime, metadata);
+      table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime));

Review comment:
       why can't this be inside `preCommit()`?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -273,6 +275,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
   public void rollbackFailedBootstrap() {
     LOG.info("Rolling back pending bootstrap if present");
     HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
+    table.getHoodieView().sync();

Review comment:
       need to remove? 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -81,13 +84,16 @@
 public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMetadataWriter {
 
   private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadataWriter.class);
+  private static final Integer MAX_BUCKET_COUNT = 9999;
+  private static final  String BUCKET_PREFIX = "bucket-";

Review comment:
       instead of `bucket-` , what if we use the partition name, i.e `files-xxxx`

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -309,6 +303,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
       .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
 
     initTableMetadata();
+    initializeBuckets(datasetMetaClient, MetadataPartitionType.FILES.partitionPath(), createInstantTime, 1);

Review comment:
       initializeFileGroups?

##########
File path: hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
##########
@@ -71,6 +72,7 @@ public void init() throws Exception {
     HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
         .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())

Review comment:
       lets track all these things ? and eventually fix these

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -298,28 +306,38 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteSt
                                     String compactionCommitTime) {
     this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
     List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
-    finalizeWrite(table, compactionCommitTime, writeStats);
-    LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
-    SparkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
-    WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-      try {
-        metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
-            durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
-      } catch (ParseException e) {
-        throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
-            + config.getBasePath() + " at time " + compactionCommitTime, e);
+    try {
+      // TODO: check if we need HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config) here.
+      this.txnManager.beginTransaction(Option.of(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)), Option.empty());
+      // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
+      // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
+      table.getMetadataWriter().ifPresent(w -> w.update(metadata, compactionCommitTime));
+      // commit to data table after committing to metadata table.FlinkHoodieBackedTableMetadataWriter

Review comment:
       mentioning Flink in Spark module?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -499,8 +510,20 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
   @Override
   public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) {
     if (enabled) {
-      List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(metaClient.getActiveTimeline(),
-          rollbackMetadata, instantTime, metadata.getUpdateTime());
+      // Is this rollback of an instant that has been synced to the metadata table?
+      String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0);

Review comment:
       this is something to fix. this `get(0)` 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
##########
@@ -124,6 +127,19 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
     }
   }
 
+  /**
+   * Update metadata table if available. Any update to metadata table happens within data table lock.
+   * @param cleanMetadata intance of {@link HoodieCleanMetadata} to be applied to metadata.
+   */
+  private void writeToMetadata(HoodieCleanMetadata cleanMetadata) {

Review comment:
       writeMetadata

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +396,86 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
   }
 
   /**
-   * Sync the Metadata Table from the instants created on the dataset.
+   * Initialize buckets for a partition. For file listing, we just have one bucket. But for record level index, we might have N number of buckets
+   * per partition. Technically speaking buckets here map to FileGroups in Hudi.
    *
-   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   * Each bucket maps to FileGroups in Hudi and is represented with the following format:
+   *    bucket-ABCD
+   * where ABCD are digits. This allows up to 9999 buckets.
+   *
+   * Example:
+   *    bucket-0001
+   *    bucket-0002
    */
-  private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
-    ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it is not enabled");
-    // (re) init the metadata for reading.
-    initTableMetadata();
-    try {
-      List<HoodieInstant> instantsToSync = metadata.findInstantsToSyncForWriter();
-      if (instantsToSync.isEmpty()) {
-        return;
-      }
-
-      LOG.info("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync);
-
-      // Read each instant in order and sync it to metadata table
-      for (HoodieInstant instant : instantsToSync) {
-        LOG.info("Syncing instant " + instant + " to metadata table");
-
-        Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient,
-            metaClient.getActiveTimeline(), instant, metadata.getUpdateTime());
-        if (records.isPresent()) {
-          commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp());
-        }
+  private void initializeBuckets(HoodieTableMetaClient datasetMetaClient, String partition, String instantTime,
+                                 int bucketCount) throws IOException {
+    ValidationUtils.checkArgument(bucketCount <= MAX_BUCKET_COUNT, "Maximum  " + MAX_BUCKET_COUNT  + " buckets are supported.");
+
+    final HashMap<HeaderMetadataType, String> blockHeader = new HashMap<>();
+    blockHeader.put(HeaderMetadataType.INSTANT_TIME, instantTime);
+    // Archival of data table has a dependency on compaction(base files) in metadata table.
+    // It is assumed that as of time Tx of base instant (/compaction time) in metadata table,
+    // all commits in data table is in sync with metadata table. So, we always create start with log file for any bucket.
+    // but we have to work on relaxing that in future : https://issues.apache.org/jira/browse/HUDI-2458
+    final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0], blockHeader);
+
+    LOG.info(String.format("Creating %d buckets for partition %s with base fileId %s at instant time %s",
+        bucketCount, partition, BUCKET_PREFIX, instantTime));
+    for (int i = 0; i < bucketCount; ++i) {
+      final String bucketFileId = String.format("%s%04d", BUCKET_PREFIX, i + 1);
+      try {
+        // since all shards are initialized in driver, we don't need to create a random write token.
+        String writeToken = FSUtils.makeWriteToken(0, 0, 0);

Review comment:
       any constants to reuse?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -126,8 +129,13 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Op
 
   @Override
   protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> createTable(HoodieWriteConfig config,
-                                                                                                           Configuration hadoopConf) {
-    return HoodieSparkTable.create(config, context);
+                                                                                                           Configuration hadoopConf,
+                                                                                                           boolean refreshTimeline) {

Review comment:
       format

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +396,86 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
   }
 
   /**
-   * Sync the Metadata Table from the instants created on the dataset.
+   * Initialize buckets for a partition. For file listing, we just have one bucket. But for record level index, we might have N number of buckets
+   * per partition. Technically speaking buckets here map to FileGroups in Hudi.
    *
-   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   * Each bucket maps to FileGroups in Hudi and is represented with the following format:
+   *    bucket-ABCD
+   * where ABCD are digits. This allows up to 9999 buckets.
+   *
+   * Example:
+   *    bucket-0001
+   *    bucket-0002
    */
-  private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
-    ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it is not enabled");
-    // (re) init the metadata for reading.
-    initTableMetadata();
-    try {
-      List<HoodieInstant> instantsToSync = metadata.findInstantsToSyncForWriter();
-      if (instantsToSync.isEmpty()) {
-        return;
-      }
-
-      LOG.info("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync);
-
-      // Read each instant in order and sync it to metadata table
-      for (HoodieInstant instant : instantsToSync) {
-        LOG.info("Syncing instant " + instant + " to metadata table");
-
-        Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient,
-            metaClient.getActiveTimeline(), instant, metadata.getUpdateTime());
-        if (records.isPresent()) {
-          commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp());
-        }
+  private void initializeBuckets(HoodieTableMetaClient datasetMetaClient, String partition, String instantTime,

Review comment:
        partitionPath

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -81,13 +84,16 @@
 public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMetadataWriter {
 
   private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadataWriter.class);
+  private static final Integer MAX_BUCKET_COUNT = 9999;

Review comment:
       lets make this 99999? do we even need this? this is just to format the bucket right? can't we dynamically figure this out. i.e if `numBuckets=9999` ,the you know there needs to be four digits and bucket string should have four characters right. 
   

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +396,86 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
   }
 
   /**
-   * Sync the Metadata Table from the instants created on the dataset.
+   * Initialize buckets for a partition. For file listing, we just have one bucket. But for record level index, we might have N number of buckets

Review comment:
       we should avoid leaking buckets everywhere. unless we cannot explain code with jsut file groups, we should not be using buckets IMO.

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java
##########
@@ -49,7 +49,9 @@ public void run(HoodieTableMetaClient metaClient, HoodieTableVersion toVersion,
       return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime);
     } else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) {
       return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime);
-    } else {
+    }  else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) {

Review comment:
       lets add a jira to track what all needs to be finally done for an upgrade-downgrade story here

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
##########
@@ -31,13 +30,16 @@
  */
 public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
 
+  // Update the metadata table due to a COMMIT operation

Review comment:
       javadoc

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -298,28 +306,38 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteSt
                                     String compactionCommitTime) {
     this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
     List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
-    finalizeWrite(table, compactionCommitTime, writeStats);
-    LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
-    SparkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
-    WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-      try {
-        metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
-            durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
-      } catch (ParseException e) {
-        throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
-            + config.getBasePath() + " at time " + compactionCommitTime, e);
+    try {
+      // TODO: check if we need HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config) here.
+      this.txnManager.beginTransaction(Option.of(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)), Option.empty());

Review comment:
       lets shrink it to just the oart needed within a lock

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -339,6 +357,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteSt
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+    table.getHoodieView().sync();

Review comment:
       more instances to cleanup?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -273,6 +275,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
   public void rollbackFailedBootstrap() {
     LOG.info("Rolling back pending bootstrap if present");
     HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
+    table.getHoodieView().sync();

Review comment:
       can this call getTableAnd.... ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org