You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/19 10:30:18 UTC

[hudi] 10/11: [HUDI-3899] Drop index to delete pending index instants from timeline if applicable (#5342)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 46deea05c402e986395875b35f8bdab6cd8bbda5
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Tue Apr 19 07:58:46 2022 +0530

    [HUDI-3899] Drop index to delete pending index instants from timeline if applicable (#5342)
    
    
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  39 ++++-
 .../org/apache/hudi/utilities/HoodieIndexer.java   |   2 +-
 .../apache/hudi/utilities/TestHoodieIndexer.java   | 176 +++++++++++++++++++--
 .../indexer-only-bloom.properties                  |  25 +++
 4 files changed, 225 insertions(+), 17 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index be1aed75e2..d080d14a69 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -18,13 +18,9 @@
 
 package org.apache.hudi.metadata;
 
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
 import org.apache.hudi.avro.model.HoodieInstantInfo;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -70,6 +66,12 @@ import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.exception.HoodieMetadataException;
+
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -88,6 +90,9 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
+import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
 import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
 import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
@@ -727,9 +732,33 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
       HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
       LOG.warn("Deleting Metadata Table partitions: " + partitionPath);
       dataMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath(), partitionPath), true);
+      // delete corresponding pending indexing instant file in the timeline
+      LOG.warn("Deleting pending indexing instant from the timeline for partition: " + partitionPath);
+      deletePendingIndexingInstant(dataMetaClient, partitionPath);
     }
   }
 
+  /**
+   * Deletes any pending indexing instant, if it exists.
+   * It reads the plan from indexing.requested file and deletes both requested and inflight instants,
+   * if the partition path in the plan matches with the given partition path.
+   */
+  private static void deletePendingIndexingInstant(HoodieTableMetaClient metaClient, String partitionPath) {
+    metaClient.reloadActiveTimeline().filterPendingIndexTimeline().getInstants().filter(instant -> REQUESTED.equals(instant.getState()))
+        .forEach(instant -> {
+          try {
+            HoodieIndexPlan indexPlan = deserializeIndexPlan(metaClient.getActiveTimeline().readIndexPlanAsBytes(instant).get());
+            if (indexPlan.getIndexPartitionInfos().stream()
+                .anyMatch(indexPartitionInfo -> indexPartitionInfo.getMetadataPartitionPath().equals(partitionPath))) {
+              metaClient.getActiveTimeline().deleteInstantFileIfExists(instant);
+              metaClient.getActiveTimeline().deleteInstantFileIfExists(getIndexInflightInstant(instant.getTimestamp()));
+            }
+          } catch (IOException e) {
+            LOG.error("Failed to delete the instant file corresponding to " + instant);
+          }
+        });
+  }
+
   private MetadataRecordsGenerationParams getRecordsGenerationParams() {
     return new MetadataRecordsGenerationParams(
         dataMetaClient,
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
index 501f9296b4..96f6ce38cd 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
@@ -85,7 +85,7 @@ import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
 public class HoodieIndexer {
 
   private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
-  private static final String DROP_INDEX = "dropindex";
+  static final String DROP_INDEX = "dropindex";
 
   private final HoodieIndexer.Config cfg;
   private TypedProperties props;
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index 695ca88413..9312a26b4f 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -28,14 +28,17 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.testutils.providers.SparkProvider;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -50,10 +53,16 @@ import java.util.List;
 import java.util.Objects;
 
 import static org.apache.hudi.common.table.HoodieTableMetaClient.reload;
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
 import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
 import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
 import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.apache.hudi.utilities.HoodieIndexer.DROP_INDEX;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -81,6 +90,14 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
     initMetaClient();
   }
 
+  protected void initMetaClient() throws IOException {
+    String rootPathStr = "file://" + tempDir.toAbsolutePath().toString();
+    Path rootPath = new Path(rootPathStr);
+    rootPath.getFileSystem(jsc.hadoopConfiguration()).mkdirs(rootPath);
+    metaClient = HoodieTestUtils.init(rootPathStr, getTableType());
+    basePath = metaClient.getBasePath();
+  }
+
   @Test
   public void testGetRequestedPartitionTypes() {
     HoodieIndexer.Config config = new HoodieIndexer.Config();
@@ -132,29 +149,166 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
     assertNoWriteErrors(statuses);
 
     // validate table config
-    assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
-    assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
 
     // build indexer config which has only column_stats enabled (files is enabled by default)
     HoodieIndexer.Config config = new HoodieIndexer.Config();
     String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
     config.basePath = basePath;
     config.tableName = tableName;
-    config.indexTypes = "COLUMN_STATS";
-    config.runningMode = "scheduleAndExecute";
+    config.indexTypes = COLUMN_STATS.name();
+    config.runningMode = SCHEDULE_AND_EXECUTE;
     config.propsFilePath = propsPath;
     // start the indexer and validate column_stats index is also complete
     HoodieIndexer indexer = new HoodieIndexer(jsc, config);
     assertEquals(0, indexer.start(0));
 
     // validate table config
-    assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
-    assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
-    assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
     // validate metadata partitions actually exist
-    assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, FILES));
-    assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, COLUMN_STATS));
-    assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, BLOOM_FILTERS));
+    assertTrue(metadataPartitionExists(basePath, context, FILES));
+    assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS));
+    assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS));
+  }
+
+  @Test
+  public void testIndexerDropPartitionDeletesInstantFromTimeline() {
+    initTestDataGenerator();
+    String tableName = "indexer_test";
+    HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName);
+    // enable files on the regular write client
+    HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
+    HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build();
+    // do one upsert with synchronous metadata update
+    SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig);
+    String instant = "0001";
+    writeClient.startCommitWithTime(instant);
+    List<HoodieRecord> records = dataGen.generateInserts(instant, 100);
+    JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 1), instant);
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    // validate partitions built successfully
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
+    assertTrue(metadataPartitionExists(basePath, context, FILES));
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
+    assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS));
+
+    // build indexer config which has only column_stats enabled (files is enabled by default)
+    HoodieIndexer.Config config = new HoodieIndexer.Config();
+    String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
+    config.basePath = basePath;
+    config.tableName = tableName;
+    config.indexTypes = COLUMN_STATS.name();
+    config.runningMode = SCHEDULE;
+    config.propsFilePath = propsPath;
+
+    // schedule indexing and validate column_stats index is also initialized
+    HoodieIndexer indexer = new HoodieIndexer(jsc, config);
+    assertEquals(0, indexer.start(0));
+    Option<HoodieInstant> indexInstantInTimeline = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant();
+    assertTrue(indexInstantInTimeline.isPresent());
+    assertEquals(REQUESTED, indexInstantInTimeline.get().getState());
+    assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS));
+
+    // drop column_stats and validate indexing.requested is also removed from the timeline
+    config.runningMode = DROP_INDEX;
+    indexer = new HoodieIndexer(jsc, config);
+    assertEquals(0, indexer.start(0));
+    indexInstantInTimeline = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant();
+    assertFalse(indexInstantInTimeline.isPresent());
+    assertFalse(metadataPartitionExists(basePath, context, COLUMN_STATS));
+
+    // check other partitions are intact
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
+    assertTrue(metadataPartitionExists(basePath, context, FILES));
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
+    assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS));
+  }
+
+  @Test
+  public void testTwoIndexersOneCreateOneDropPartition() {
+    initTestDataGenerator();
+    String tableName = "indexer_test";
+    HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName);
+    // enable files on the regular write client
+    HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false);
+    HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build();
+    // do one upsert with synchronous metadata update
+    SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig);
+    String instant = "0001";
+    writeClient.startCommitWithTime(instant);
+    List<HoodieRecord> records = dataGen.generateInserts(instant, 100);
+    JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 1), instant);
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    // validate files partition built successfully
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
+    assertTrue(metadataPartitionExists(basePath, context, FILES));
+
+    // build indexer config which has only bloom_filters enabled
+    HoodieIndexer.Config config = getHoodieIndexConfig(BLOOM_FILTERS.name(), SCHEDULE_AND_EXECUTE, "delta-streamer-config/indexer-only-bloom.properties");
+    // start the indexer and validate bloom_filters index is also complete
+    HoodieIndexer indexer = new HoodieIndexer(jsc, config);
+    assertEquals(0, indexer.start(0));
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
+    assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS));
+
+    // completed index timeline for later validation
+    Option<HoodieInstant> bloomIndexInstant = metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant();
+    assertTrue(bloomIndexInstant.isPresent());
+
+    // build indexer config which has only column_stats enabled
+    config = getHoodieIndexConfig(COLUMN_STATS.name(), SCHEDULE, "delta-streamer-config/indexer.properties");
+
+    // schedule indexing and validate column_stats index is also initialized
+    // and indexing.requested instant is present
+    indexer = new HoodieIndexer(jsc, config);
+    assertEquals(0, indexer.start(0));
+    Option<HoodieInstant> columnStatsIndexInstant = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant();
+    assertTrue(columnStatsIndexInstant.isPresent());
+    assertEquals(REQUESTED, columnStatsIndexInstant.get().getState());
+    assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS));
+
+    // drop column_stats and validate indexing.requested is also removed from the timeline
+    // and completed indexing instant corresponding to bloom_filters index is still present
+    dropIndexAndAssert(COLUMN_STATS, "delta-streamer-config/indexer.properties", Option.empty());
+
+    // check other partitions are intact
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
+    assertTrue(metadataPartitionExists(basePath, context, FILES));
+    assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
+    assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS));
+
+    // drop bloom filter partition. timeline files should not be deleted since the index building is complete.
+    dropIndexAndAssert(BLOOM_FILTERS, "delta-streamer-config/indexer-only-bloom.properties", bloomIndexInstant);
+  }
+
+  private void dropIndexAndAssert(MetadataPartitionType indexType, String resourceFilePath, Option<HoodieInstant> completedIndexInstant) {
+    HoodieIndexer.Config config = getHoodieIndexConfig(indexType.name(), DROP_INDEX, resourceFilePath);
+    HoodieIndexer indexer = new HoodieIndexer(jsc, config);
+    assertEquals(0, indexer.start(0));
+    Option<HoodieInstant> pendingFlights = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant();
+    assertFalse(pendingFlights.isPresent());
+    assertFalse(metadataPartitionExists(basePath, context, indexType));
+    if (completedIndexInstant.isPresent()) {
+      assertEquals(completedIndexInstant, metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant());
+    }
+  }
+
+  private HoodieIndexer.Config getHoodieIndexConfig(String indexType, String runMode, String resourceFilePath) {
+    HoodieIndexer.Config config = new HoodieIndexer.Config();
+    String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource(resourceFilePath)).getPath();
+    config.basePath = basePath;
+    config.tableName = tableName;
+    config.indexTypes = indexType;
+    config.runningMode = runMode;
+    config.propsFilePath = propsPath;
+    return config;
   }
 
   private static HoodieWriteConfig.Builder getWriteConfigBuilder(String basePath, String tableName) {
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/indexer-only-bloom.properties b/hudi-utilities/src/test/resources/delta-streamer-config/indexer-only-bloom.properties
new file mode 100644
index 0000000000..6035077437
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/indexer-only-bloom.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.metadata.enable=true
+hoodie.metadata.index.async=true
+hoodie.metadata.index.bloom.filter.enable=true
+hoodie.metadata.index.check.timeout.seconds=60
+hoodie.write.concurrency.mode=optimistic_concurrency_control
+hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
\ No newline at end of file