You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/04/02 11:01:45 UTC

[hudi] branch master updated: [HUDI-3451] Delete metadata table when the write client disables MDT (#5186)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 020786a  [HUDI-3451] Delete metadata table when the write client disables MDT (#5186)
020786a is described below

commit 020786a5f9d25bf140decf24d65e07dd738e4f9d
Author: YueZhang <69...@users.noreply.github.com>
AuthorDate: Sat Apr 2 19:01:06 2022 +0800

    [HUDI-3451] Delete metadata table when the write client disables MDT (#5186)
    
    * Add checks for metadata table init to avoid possible out-of-sync
    
    * Revise the logic to reuse existing table config
    
    * Revise docs and naming
    
    Co-authored-by: yuezhang <yu...@freewheel.tv>
    Co-authored-by: Y Ethan Guo <et...@gmail.com>
---
 .../java/org/apache/hudi/table/HoodieTable.java    | 55 ++++++++++++--
 .../org/apache/hudi/table/HoodieFlinkTable.java    |  4 +-
 .../org/apache/hudi/table/HoodieSparkTable.java    |  2 +
 .../functional/TestHoodieBackedMetadata.java       | 84 +++++++++++++++++++---
 .../client/functional/TestHoodieMetadataBase.java  |  5 ++
 .../hudi/common/table/HoodieTableConfig.java       |  2 +-
 6 files changed, 136 insertions(+), 16 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index f52d46a..ae06e6b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,11 +18,6 @@
 
 package org.apache.hudi.table;
 
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -50,6 +45,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -64,6 +60,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -80,6 +77,12 @@ import org.apache.hudi.table.marker.WriteMarkers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.table.storage.HoodieLayoutFactory;
 import org.apache.hudi.table.storage.HoodieStorageLayout;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -802,6 +805,48 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
     return Option.empty();
   }
 
+  /**
+   * Deletes the metadata table if the writer disables metadata table with hoodie.metadata.enable=false
+   */
+  public void maybeDeleteMetadataTable() {
+    if (shouldExecuteMetadataTableDeletion()) {
+      try {
+        Path mdtBasePath = new Path(HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath()));
+        FileSystem fileSystem = metaClient.getFs();
+        if (fileSystem.exists(mdtBasePath)) {
+          LOG.info("Deleting metadata table because it is disabled in writer.");
+          fileSystem.delete(mdtBasePath, true);
+        }
+        clearMetadataTablePartitionsConfig();
+      } catch (IOException ioe) {
+        throw new HoodieIOException("Failed to delete metadata table.", ioe);
+      }
+    }
+  }
+
+  private boolean shouldExecuteMetadataTableDeletion() {
+    // Only execute metadata table deletion when all the following conditions are met
+    // (1) This is data table
+    // (2) Metadata table is disabled in HoodieWriteConfig for the writer
+    // (3) Check `HoodieTableConfig.TABLE_METADATA_PARTITIONS`.  Either the table config
+    // does not exist, or the table config is non-empty indicating that metadata table
+    // partitions are ready to use
+    return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
+        && !config.isMetadataTableEnabled()
+        && (!metaClient.getTableConfig().contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)
+        || !StringUtils.isNullOrEmpty(metaClient.getTableConfig().getMetadataPartitions()));
+  }
+
+  /**
+   * Clears hoodie.table.metadata.partitions in hoodie.properties
+   */
+  private void clearMetadataTablePartitionsConfig() {
+    LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
+    metaClient.getTableConfig().setValue(
+        HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), StringUtils.EMPTY_STRING);
+    HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
+  }
+
   public HoodieTableMetadata getMetadataTable() {
     return this.metadata;
   }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 2f08a55..f749ce4 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.table;
 
-import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.data.HoodieData;
@@ -37,6 +36,8 @@ import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
+import org.apache.avro.specific.SpecificRecordBase;
+
 import java.util.List;
 
 import static org.apache.hudi.common.data.HoodieList.getList;
@@ -107,6 +108,7 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
       return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
           context, actionMetadata, Option.of(triggeringInstantTimestamp)));
     } else {
+      maybeDeleteMetadataTable();
       return Option.empty();
     }
   }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index ce14d43..9e4bb14 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -122,6 +122,8 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
       } catch (IOException e) {
         throw new HoodieMetadataException("Checking existence of metadata table failed", e);
       }
+    } else {
+      maybeDeleteMetadataTable();
     }
 
     return Option.empty();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index f60de7d..5c73d96 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -18,15 +18,6 @@
 
 package org.apache.hudi.client.functional;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.util.Time;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
@@ -76,6 +67,7 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -100,6 +92,16 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
 import org.apache.hudi.table.upgrade.UpgradeDowngrade;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.Time;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroSchemaConverter;
@@ -114,6 +116,7 @@ import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -198,6 +201,69 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     validateMetadata(testTable, true);
   }
 
+  @Test
+  public void testTurnOffMetadataTableAfterEnable() throws Exception {
+    init(COPY_ON_WRITE, true);
+    String instant1 = "0000001";
+    HoodieCommitMetadata hoodieCommitMetadata = doWriteOperationWithMeta(testTable, instant1, INSERT);
+
+    // Simulate the complete data directory including ".hoodie_partition_metadata" file
+    File metaForP1 = new File(metaClient.getBasePath() + "/p1",".hoodie_partition_metadata");
+    File metaForP2 = new File(metaClient.getBasePath() + "/p2",".hoodie_partition_metadata");
+    metaForP1.createNewFile();
+    metaForP2.createNewFile();
+
+    // Sync to metadata table
+    metaClient.reloadActiveTimeline();
+    HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
+    Option metadataWriter = table.getMetadataWriter(instant1, Option.of(hoodieCommitMetadata));
+    validateMetadata(testTable, true);
+
+    assertTrue(metadataWriter.isPresent());
+    HoodieTableConfig hoodieTableConfig =
+        new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig.getPayloadClass());
+    assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty());
+
+    // Turn off metadata table
+    HoodieWriteConfig writeConfig2 = HoodieWriteConfig.newBuilder()
+        .withProperties(this.writeConfig.getProps())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .build();
+    testTable = HoodieTestTable.of(metaClient);
+    String instant2 = "0000002";
+    HoodieCommitMetadata hoodieCommitMetadata2 = doWriteOperationWithMeta(testTable, instant2, INSERT);
+    metaClient.reloadActiveTimeline();
+    HoodieTable table2 = HoodieSparkTable.create(writeConfig2, context, metaClient);
+    Option metadataWriter2 = table2.getMetadataWriter(instant2, Option.of(hoodieCommitMetadata2));
+    assertFalse(metadataWriter2.isPresent());
+
+    HoodieTableConfig hoodieTableConfig2 =
+        new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig2.getPayloadClass());
+    assertEquals(StringUtils.EMPTY_STRING, hoodieTableConfig2.getMetadataPartitions());
+    // Assert metadata table folder is deleted
+    assertFalse(metaClient.getFs().exists(
+        new Path(HoodieTableMetadata.getMetadataTableBasePath(writeConfig2.getBasePath()))));
+
+    // Enable metadata table again and initialize metadata table through
+    // HoodieTable.getMetadataWriter() function
+    HoodieWriteConfig writeConfig3 = HoodieWriteConfig.newBuilder()
+        .withProperties(this.writeConfig.getProps())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
+        .build();
+    testTable = HoodieTestTable.of(metaClient);
+    metaClient.reloadActiveTimeline();
+    String instant3 = "0000003";
+    HoodieCommitMetadata hoodieCommitMetadata3 = doWriteOperationWithMeta(testTable, instant3, INSERT);
+    metaClient.reloadActiveTimeline();
+    HoodieTable table3 = HoodieSparkTable.create(writeConfig3, context, metaClient);
+    Option metadataWriter3 = table3.getMetadataWriter(instant3, Option.of(hoodieCommitMetadata3));
+    validateMetadata(testTable, true);
+    assertTrue(metadataWriter3.isPresent());
+    HoodieTableConfig hoodieTableConfig3 =
+        new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig.getPayloadClass());
+    assertFalse(hoodieTableConfig3.getMetadataPartitions().isEmpty());
+  }
+
   /**
    * Only valid partition directories are added to the metadata.
    */
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index f00a0b8..93d4ac5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.HoodieTimelineArchiver;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
@@ -176,6 +177,10 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
     testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3);
   }
 
+  protected HoodieCommitMetadata doWriteOperationWithMeta(HoodieTestTable testTable, String commitTime, WriteOperationType operationType) throws Exception {
+    return testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3);
+  }
+
   protected void doClean(HoodieTestTable testTable, String commitTime, List<String> commitsToClean) throws IOException {
     doCleanInternal(testTable, commitTime, commitsToClean, false);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 923ee27..cfb0df3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -607,7 +607,7 @@ public class HoodieTableConfig extends HoodieConfig {
   public String getMetadataPartitions() {
     return getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING);
   }
-
+  
   public Map<String, String> propsMap() {
     return props.entrySet().stream()
         .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));