You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/05/24 05:08:01 UTC

[hudi] branch master updated: [HUDI-4138] Fix the concurrency modification of hoodie table config for flink (#5660)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 676d5cefe0 [HUDI-4138] Fix the concurrency modification of hoodie table config for flink (#5660)
676d5cefe0 is described below

commit 676d5cefe0294f495db04ec6476afdc00cbf0dd2
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Tue May 24 13:07:55 2022 +0800

    [HUDI-4138] Fix the concurrency modification of hoodie table config for flink (#5660)
    
    * Remove the metadata cleaning strategy for flink, that means the multi-modal index may be affected
    * Improve the HoodieTable#clearMetadataTablePartitionsConfig to only update table config when necessary
    * Remove the modification of read code path in HoodieTableConfig
---
 .../main/java/org/apache/hudi/table/HoodieTable.java  | 14 ++++++--------
 .../apache/hudi/client/HoodieFlinkWriteClient.java    |  4 +---
 .../java/org/apache/hudi/table/HoodieFlinkTable.java  |  4 ----
 .../apache/hudi/common/table/HoodieTableConfig.java   | 19 ++++++-------------
 4 files changed, 13 insertions(+), 28 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 807865dae2..56526d23db 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -885,24 +885,22 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
     // partitions are ready to use
     return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
         && !config.isMetadataTableEnabled()
-        && (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS)
-        || !metaClient.getTableConfig().getMetadataPartitions().isEmpty());
+        && !metaClient.getTableConfig().getMetadataPartitions().isEmpty();
   }
 
   /**
    * Clears hoodie.table.metadata.partitions in hoodie.properties
    */
   private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> partitionType, boolean clearAll) {
-    if (clearAll) {
+    Set<String> partitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
+    if (clearAll && partitions.size() > 0) {
       LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
       metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING);
       HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
-      return;
+    } else if (partitions.remove(partitionType.get().getPartitionPath())) {
+      metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", partitions));
+      HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
     }
-    Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
-    completedPartitions.remove(partitionType.get().getPartitionPath());
-    metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
-    HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
   }
 
   public HoodieTableMetadata getMetadataTable() {
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 2d23c3afb7..49fa2ec246 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -53,7 +53,6 @@ import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.io.MiniBatchHandle;
 import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
-import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
@@ -365,8 +364,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
       // commit to data table after committing to metadata table.
       // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
       // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
-      table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent(
-          w -> ((HoodieTableMetadataWriter) w).update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
+      writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata);
       LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
       CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
     } finally {
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index f1e43b9d30..6eae15e7e1 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -105,13 +105,9 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
   public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
                                                                                             Option<T> actionMetadata) {
     if (config.isMetadataTableEnabled()) {
-      // even with metadata enabled, some index could have been disabled
-      // delete metadata partitions corresponding to such indexes
-      deleteMetadataIndexIfNecessary();
       return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
           context, actionMetadata, Option.of(triggeringInstantTimestamp)));
     } else {
-      maybeDeleteMetadataTable();
       return Option.empty();
     }
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index edc6caa5bc..886911466b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -272,8 +272,8 @@ public class HoodieTableConfig extends HoodieConfig {
    * @throws IOException
    */
   private static String storeProperties(Properties props, FSDataOutputStream outputStream) throws IOException {
-    String checksum;
-    if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) {
+    final String checksum;
+    if (isValidChecksum(props)) {
       checksum = props.getProperty(TABLE_CHECKSUM.key());
       props.store(outputStream, "Updated at " + Instant.now());
     } else {
@@ -285,8 +285,8 @@ public class HoodieTableConfig extends HoodieConfig {
     return checksum;
   }
 
-  private boolean isValidChecksum() {
-    return contains(TABLE_CHECKSUM) && validateChecksum(props);
+  private static boolean isValidChecksum(Properties props) {
+    return props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props);
   }
 
   /**
@@ -298,20 +298,13 @@ public class HoodieTableConfig extends HoodieConfig {
 
   private void fetchConfigs(FileSystem fs, String metaPath) throws IOException {
     Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
-    Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
     try (FSDataInputStream is = fs.open(cfgPath)) {
       props.load(is);
-      // validate checksum for latest table version
-      if (getTableVersion().versionCode() >= HoodieTableVersion.FOUR.versionCode() && !isValidChecksum()) {
-        LOG.warn("Checksum validation failed. Falling back to backed up configs.");
-        try (FSDataInputStream fsDataInputStream = fs.open(backupCfgPath)) {
-          props.load(fsDataInputStream);
-        }
-      }
     } catch (IOException ioe) {
       if (!fs.exists(cfgPath)) {
         LOG.warn("Run `table recover-configs` if config update/delete failed midway. Falling back to backed up configs.");
         // try the backup. this way no query ever fails if update fails midway.
+        Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
         try (FSDataInputStream is = fs.open(backupCfgPath)) {
           props.load(is);
         }
@@ -631,7 +624,7 @@ public class HoodieTableConfig extends HoodieConfig {
         CONFIG_VALUES_DELIMITER
     );
   }
-  
+
   /**
    * Returns the format to use for partition meta files.
    */