You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/04/02 11:01:45 UTC
[hudi] branch master updated: [HUDI-3451] Delete metadata table when the write client disables MDT (#5186)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 020786a [HUDI-3451] Delete metadata table when the write client disables MDT (#5186)
020786a is described below
commit 020786a5f9d25bf140decf24d65e07dd738e4f9d
Author: YueZhang <69...@users.noreply.github.com>
AuthorDate: Sat Apr 2 19:01:06 2022 +0800
[HUDI-3451] Delete metadata table when the write client disables MDT (#5186)
* Add checks for metadata table init to avoid possible out-of-sync
* Revise the logic to reuse existing table config
* Revise docs and naming
Co-authored-by: yuezhang <yu...@freewheel.tv>
Co-authored-by: Y Ethan Guo <et...@gmail.com>
---
.../java/org/apache/hudi/table/HoodieTable.java | 55 ++++++++++++--
.../org/apache/hudi/table/HoodieFlinkTable.java | 4 +-
.../org/apache/hudi/table/HoodieSparkTable.java | 2 +
.../functional/TestHoodieBackedMetadata.java | 84 +++++++++++++++++++---
.../client/functional/TestHoodieMetadataBase.java | 5 ++
.../hudi/common/table/HoodieTableConfig.java | 2 +-
6 files changed, 136 insertions(+), 16 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index f52d46a..ae06e6b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,11 +18,6 @@
package org.apache.hudi.table;
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -50,6 +45,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -64,6 +60,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -80,6 +77,12 @@ import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.storage.HoodieLayoutFactory;
import org.apache.hudi.table.storage.HoodieStorageLayout;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -802,6 +805,48 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
return Option.empty();
}
+ /**
+ * Deletes the metadata table if the writer disables metadata table with hoodie.metadata.enable=false
+ */
+ public void maybeDeleteMetadataTable() {
+ if (shouldExecuteMetadataTableDeletion()) {
+ try {
+ Path mdtBasePath = new Path(HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath()));
+ FileSystem fileSystem = metaClient.getFs();
+ if (fileSystem.exists(mdtBasePath)) {
+ LOG.info("Deleting metadata table because it is disabled in writer.");
+ fileSystem.delete(mdtBasePath, true);
+ }
+ clearMetadataTablePartitionsConfig();
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Failed to delete metadata table.", ioe);
+ }
+ }
+ }
+
+ private boolean shouldExecuteMetadataTableDeletion() {
+ // Only execute metadata table deletion when all the following conditions are met
+ // (1) This is data table
+ // (2) Metadata table is disabled in HoodieWriteConfig for the writer
+ // (3) Check `HoodieTableConfig.TABLE_METADATA_PARTITIONS`. Either the table config
+ // does not exist, or the table config is non-empty indicating that metadata table
+ // partitions are ready to use
+ return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
+ && !config.isMetadataTableEnabled()
+ && (!metaClient.getTableConfig().contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)
+ || !StringUtils.isNullOrEmpty(metaClient.getTableConfig().getMetadataPartitions()));
+ }
+
+ /**
+ * Clears hoodie.table.metadata.partitions in hoodie.properties
+ */
+ private void clearMetadataTablePartitionsConfig() {
+ LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
+ metaClient.getTableConfig().setValue(
+ HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), StringUtils.EMPTY_STRING);
+ HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
+ }
+
public HoodieTableMetadata getMetadataTable() {
return this.metadata;
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 2f08a55..f749ce4 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -18,7 +18,6 @@
package org.apache.hudi.table;
-import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieData;
@@ -37,6 +36,8 @@ import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.avro.specific.SpecificRecordBase;
+
import java.util.List;
import static org.apache.hudi.common.data.HoodieList.getList;
@@ -107,6 +108,7 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
context, actionMetadata, Option.of(triggeringInstantTimestamp)));
} else {
+ maybeDeleteMetadataTable();
return Option.empty();
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index ce14d43..9e4bb14 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -122,6 +122,8 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
} catch (IOException e) {
throw new HoodieMetadataException("Checking existence of metadata table failed", e);
}
+ } else {
+ maybeDeleteMetadataTable();
}
return Option.empty();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index f60de7d..5c73d96 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -18,15 +18,6 @@
package org.apache.hudi.client.functional;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.util.Time;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
@@ -76,6 +67,7 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -100,6 +92,16 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
@@ -114,6 +116,7 @@ import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -198,6 +201,69 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
validateMetadata(testTable, true);
}
+ @Test
+ public void testTurnOffMetadataTableAfterEnable() throws Exception {
+ init(COPY_ON_WRITE, true);
+ String instant1 = "0000001";
+ HoodieCommitMetadata hoodieCommitMetadata = doWriteOperationWithMeta(testTable, instant1, INSERT);
+
+ // Simulate the complete data directory including ".hoodie_partition_metadata" file
+ File metaForP1 = new File(metaClient.getBasePath() + "/p1",".hoodie_partition_metadata");
+ File metaForP2 = new File(metaClient.getBasePath() + "/p2",".hoodie_partition_metadata");
+ metaForP1.createNewFile();
+ metaForP2.createNewFile();
+
+ // Sync to metadata table
+ metaClient.reloadActiveTimeline();
+ HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
+ Option metadataWriter = table.getMetadataWriter(instant1, Option.of(hoodieCommitMetadata));
+ validateMetadata(testTable, true);
+
+ assertTrue(metadataWriter.isPresent());
+ HoodieTableConfig hoodieTableConfig =
+ new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig.getPayloadClass());
+ assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty());
+
+ // Turn off metadata table
+ HoodieWriteConfig writeConfig2 = HoodieWriteConfig.newBuilder()
+ .withProperties(this.writeConfig.getProps())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ .build();
+ testTable = HoodieTestTable.of(metaClient);
+ String instant2 = "0000002";
+ HoodieCommitMetadata hoodieCommitMetadata2 = doWriteOperationWithMeta(testTable, instant2, INSERT);
+ metaClient.reloadActiveTimeline();
+ HoodieTable table2 = HoodieSparkTable.create(writeConfig2, context, metaClient);
+ Option metadataWriter2 = table2.getMetadataWriter(instant2, Option.of(hoodieCommitMetadata2));
+ assertFalse(metadataWriter2.isPresent());
+
+ HoodieTableConfig hoodieTableConfig2 =
+ new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig2.getPayloadClass());
+ assertEquals(StringUtils.EMPTY_STRING, hoodieTableConfig2.getMetadataPartitions());
+ // Assert metadata table folder is deleted
+ assertFalse(metaClient.getFs().exists(
+ new Path(HoodieTableMetadata.getMetadataTableBasePath(writeConfig2.getBasePath()))));
+
+ // Enable metadata table again and initialize metadata table through
+ // HoodieTable.getMetadataWriter() function
+ HoodieWriteConfig writeConfig3 = HoodieWriteConfig.newBuilder()
+ .withProperties(this.writeConfig.getProps())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
+ .build();
+ testTable = HoodieTestTable.of(metaClient);
+ metaClient.reloadActiveTimeline();
+ String instant3 = "0000003";
+ HoodieCommitMetadata hoodieCommitMetadata3 = doWriteOperationWithMeta(testTable, instant3, INSERT);
+ metaClient.reloadActiveTimeline();
+ HoodieTable table3 = HoodieSparkTable.create(writeConfig3, context, metaClient);
+ Option metadataWriter3 = table3.getMetadataWriter(instant3, Option.of(hoodieCommitMetadata3));
+ validateMetadata(testTable, true);
+ assertTrue(metadataWriter3.isPresent());
+ HoodieTableConfig hoodieTableConfig3 =
+ new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig.getPayloadClass());
+ assertFalse(hoodieTableConfig3.getMetadataPartitions().isEmpty());
+ }
+
/**
* Only valid partition directories are added to the metadata.
*/
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index f00a0b8..93d4ac5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
@@ -176,6 +177,10 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3);
}
+ protected HoodieCommitMetadata doWriteOperationWithMeta(HoodieTestTable testTable, String commitTime, WriteOperationType operationType) throws Exception {
+ return testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3);
+ }
+
protected void doClean(HoodieTestTable testTable, String commitTime, List<String> commitsToClean) throws IOException {
doCleanInternal(testTable, commitTime, commitsToClean, false);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 923ee27..cfb0df3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -607,7 +607,7 @@ public class HoodieTableConfig extends HoodieConfig {
public String getMetadataPartitions() {
return getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING);
}
-
+
public Map<String, String> propsMap() {
return props.entrySet().stream()
.collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));