You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/02 19:57:50 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS

nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r818036625



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -1301,4 +1369,33 @@ public void close() {
     this.heartbeatClient.stop();
     this.txnManager.close();
   }
+
+  private void setWriteTimer(HoodieTable<T, I, K, O> table) {
+    String commitType = table.getMetaClient().getCommitActionType();
+    if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) {
+      writeTimer = metrics.getCommitCtx();
+    } else if (commitType.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
+      writeTimer = metrics.getDeltaCommitCtx();
+    }
+  }
+
+  private void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
+    UpgradeDowngrade upgradeDowngrade =
+        new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper);
+
+    if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {

Review comment:
       as per master, this code is available only in SparkEngine. Flink and java does not have this. Even if we wish to unify, I would do it in a separate patch and getting it reviewed by experts who have worked on it. Can you move this to SparkRDDWriteClient for now. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -1246,17 +1243,88 @@ public HoodieMetrics getMetrics() {
   }
 
   /**
-   * Get HoodieTable and init {@link Timer.Context}.
+   * Instantiates engine-specific instance of {@link HoodieTable} as well as performs necessary
+   * bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped)
    *
-   * @param operationType write operation type
+   * NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS
+   *       NOT REQUIRING EXTERNAL SYNCHRONIZATION
+   *
+   * @param metaClient instance of {@link HoodieTableMetaClient}
    * @param instantTime current inflight instant time
-   * @return HoodieTable
+   * @return instantiated {@link HoodieTable}
    */
-  protected abstract HoodieTable<T, I, K, O> getTableAndInitCtx(WriteOperationType operationType, String instantTime);
+  protected abstract HoodieTable<T, I, K, O> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime);
 
   /**
-   * Sets write schema from last instant since deletes may not have schema set in the config.
+   * Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
+   * operations such as:
+   *
+   * NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please check on
+   * {@link #doInitTable(HoodieTableMetaClient, Option<String>)} instead
+   *
+   * <ul>
+   *   <li>Checking whether upgrade/downgrade is required</li>
+   *   <li>Bootstrapping Metadata Table (if required)</li>
+   *   <li>Initializing metrics contexts</li>
+   * </ul>
    */
+  protected final HoodieTable<T, I, K, O> initTable(WriteOperationType operationType, Option<String> instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    // Setup write schemas for deletes
+    if (operationType == WriteOperationType.DELETE) {
+      setWriteSchemaForDeletes(metaClient);
+    }
+
+    HoodieTable<T, I, K, O> table;
+
+    this.txnManager.beginTransaction();
+    try {
+      tryUpgrade(metaClient, instantTime);
+      table = doInitTable(metaClient, instantTime);
+    } finally {
+      this.txnManager.endTransaction();
+    }
+
+    // Validate table properties
+    metaClient.validateTableProperties(config.getProps(), operationType);
+    // Make sure that FS View is in sync
+    table.getHoodieView().sync();
+
+    switch (operationType) {
+      case INSERT:
+      case INSERT_PREPPED:
+      case UPSERT:
+      case UPSERT_PREPPED:
+      case BULK_INSERT:
+      case BULK_INSERT_PREPPED:
+      case INSERT_OVERWRITE:
+      case INSERT_OVERWRITE_TABLE:
+        setWriteTimer(table);
+        break;
+      case CLUSTER:
+        clusteringTimer = metrics.getClusteringCtx();
+        break;
+      case COMPACT:
+        compactionTimer = metrics.getCompactionCtx();
+        break;
+      default:
+    }
+
+    return table;
+  }
+
+  protected <R> R withLock(Supplier<R> s) {

Review comment:
       is this used anywhere ?

##########
File path: hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieJavaTable;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Java upgrade and downgrade helper
+ */
+public class JavaUpgradeDowngradeHelper implements SupportsUpgradeDowngrade {

Review comment:
       if I am not wrong, java did not have any upgrade/downgrade step prior to this patch and now we are adding it is it ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -392,28 +405,53 @@ private HoodieMetadataColumnStats combineColumnStatsMetadata(HoodieMetadataPaylo
     return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map<String, HoodieMetadataFileInfo> combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map<String, HoodieMetadataFileInfo> combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
     Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
+
+    // First, add all files listed in the previous record
     if (previousRecord.filesystemMetadata != null) {
       combinedFileInfo.putAll(previousRecord.filesystemMetadata);
     }
 
+    // Second, merge in the files listed in the new record
     if (filesystemMetadata != null) {
-      filesystemMetadata.forEach((filename, fileInfo) -> {
-        // If the filename wasnt present then we carry it forward
-        if (!combinedFileInfo.containsKey(filename)) {
-          combinedFileInfo.put(filename, fileInfo);
-        } else {
-          if (fileInfo.getIsDeleted()) {
-            // file deletion
-            combinedFileInfo.remove(filename);
-          } else {
-            // file appends.
-            combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> {
-              return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false);
-            });
-          }
-        }
+      validatePayload(type, filesystemMetadata);
+
+      filesystemMetadata.forEach((key, fileInfo) -> {
+        combinedFileInfo.merge(key, fileInfo,
+            // Combine previous record w/ the new one, new records taking precedence over
+            // the old one
+            //
+            // NOTE: That if previous listing contains the file that is being deleted by the tombstone
+            //       record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting
+            //       listing as well as drop the tombstone itself.
+            //       However, if file is not present in the previous record we have to persist tombstone
+            //       record in the listing to make sure we carry forward information that this file
+            //       was deleted. This special case could occur since the merging flow is 2-stage:
+            //          - First we merge records from all of the delta log-files
+            //          - Then we merge records from base-files with the delta ones (coming as a result
+            //          of the previous step)
+            (oldFileInfo, newFileInfo) ->
+                // NOTE: We can’t assume that MT update records will be ordered the same way as actual
+                //       FS operations (since they are not atomic), therefore MT record merging should be a
+                //       _commutative_ & _associative_ operation (ie one that would work even in case records
+                //       will get re-ordered), which is
+                //          - Possible for file-sizes (since file-sizes will ever grow, we can simply
+                //          take max of the old and new records)
+                //          - Not possible for is-deleted flags*
+                //
+                //       *However, we’re assuming that the case of concurrent write and deletion of the same
+                //       file is _impossible_ -- it would only be possible with concurrent upsert and
+                //       rollback operation (affecting the same log-file), which is implausible, b/c either
+                //       of the following have to be true:
+                //          - We’re appending to failed log-file (then the other writer is trying to
+                //          rollback it concurrently, before it’s own write)
+                //          - Rollback (of completed instant) is running concurrently with append (meaning
+                //          that restore is running concurrently with a write, which is also nut supported

Review comment:
       thanks for the comments. definitely will be useful even for us down the line

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -291,15 +302,15 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
     if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
       throw new HoodieException("Cannot bootstrap the table in multi-writer mode");
     }
-    HoodieTable<T, I, K, O> table = getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
+    HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
     rollbackFailedBootstrap();
     table.bootstrap(context, extraMetadata);
   }
 
   /**
    * Main API to rollback failed bootstrap.
    */
-  public void rollbackFailedBootstrap() {

Review comment:
       why moving it from public to protected? we can leave it as is. I don't think anyone outside might be having their own write client and implement these public methods. but curious on why changing it now ? 

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -397,11 +396,11 @@ public void completeCompaction(
   }
 
   @Override
-  protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
-    HoodieTableMetaClient metaClient = createMetaClient(true);
+  protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
     new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance())
-        .run(HoodieTableVersion.current(), instantTime);
-    return getTableAndInitCtx(metaClient, operationType);
+        .run(HoodieTableVersion.current(), instantTime.orElse(null));

Review comment:
       are we not calling upgrade downgrade twice? 
   once in TryUpgrade and once here within doInitTable ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org