You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/31 11:19:02 UTC

[hudi] branch master updated: [HUDI-5553] Prevent partition(s) from being dropped if there are pending… (#7669)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d8576933a9b [HUDI-5553] Prevent partition(s) from being dropped if there are pending… (#7669)
d8576933a9b is described below

commit d8576933a9b30596a76819e7385de78cc25afe5b
Author: voonhous <vo...@gmail.com>
AuthorDate: Tue Jan 31 19:18:55 2023 +0800

    [HUDI-5553] Prevent partition(s) from being dropped if there are pending… (#7669)
---
 .../hudi/client/utils/DeletePartitionUtils.java    |  77 ++++++++++++
 .../client/utils/TestDeletePartitionUtils.java     | 110 ++++++++++++++++++
 .../FlinkDeletePartitionCommitActionExecutor.java  |   3 +
 .../SparkDeletePartitionCommitActionExecutor.java  |   3 +
 .../sql/hudi/TestAlterTableDropPartition.scala     | 129 ++++++++++++++++++++-
 5 files changed, 321 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
new file mode 100644
index 00000000000..92c2065457e
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.exception.HoodieDeletePartitionException;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A utility class for helper functions when performing a delete partition operation.
+ */
+public class DeletePartitionUtils {
+
+  /**
+   * Check if there are any pending table service actions (requested + inflight) on a table affecting the partitions to
+   * be dropped.
+   * <p>
+   * This check is to prevent a drop-partition from proceeding should a partition have a table service action in
+   * the pending stage. If this is allowed to happen, the filegroup that is an input for a table service action, might
+   * also be a candidate for being replaced. As such, when the table service action and drop-partition commits are
+   * committed, there will be two commits replacing a single filegroup.
+   * <p>
+   * For example, a timeline might have an execution order as such:
+   * 000.replacecommit.requested (clustering filegroup_1 + filegroup_2 -> filegroup_3)
+   * 001.replacecommit.requested, 001.replacecommit.inflight, 0001.replacecommit (drop_partition to replace filegroup_1)
+   * 000.replacecommit.inflight (clustering is executed now)
+   * 000.replacecommit (clustering completed)
+   * For an execution order as shown above, 000.replacecommit and 001.replacecommit will both flag filegroup_1 to be replaced.
+   * This will cause  downstream duplicate key errors when a map is being constructed.
+   *
+   * @param table Table to perform validation on
+   * @param partitionsToDrop List of partitions to drop
+   */
+  public static void checkForPendingTableServiceActions(HoodieTable table, List<String> partitionsToDrop) {
+    List<String> instantsOfOffendingPendingTableServiceAction = new ArrayList<>();
+    // ensure that there are no pending inflight clustering/compaction operations involving this partition
+    SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();
+
+    // separating the iteration of pending compaction operations from clustering as they return different stream types
+    Stream.concat(fileSystemView.getPendingCompactionOperations(), fileSystemView.getPendingLogCompactionOperations())
+        .filter(op -> partitionsToDrop.contains(op.getRight().getPartitionPath()))
+        .forEach(op -> instantsOfOffendingPendingTableServiceAction.add(op.getLeft()));
+
+    fileSystemView.getFileGroupsInPendingClustering()
+        .filter(fgIdInstantPair -> partitionsToDrop.contains(fgIdInstantPair.getLeft().getPartitionPath()))
+        .forEach(x -> instantsOfOffendingPendingTableServiceAction.add(x.getRight().getTimestamp()));
+
+    if (instantsOfOffendingPendingTableServiceAction.size() > 0) {
+      throw new HoodieDeletePartitionException("Failed to drop partitions. "
+          + "Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: " + partitionsToDrop + ". "
+          + "Instant(s) of offending pending table service action: "
+          + instantsOfOffendingPendingTableServiceAction.stream().distinct().collect(Collectors.toList()));
+    }
+  }
+
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java
new file mode 100644
index 00000000000..3a1632737ef
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieDeletePartitionException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestDeletePartitionUtils {
+
+  private static final String PARTITION_IN_PENDING_SERVICE_ACTION = "partition_with_pending_table_service_action";
+  private static final String HARDCODED_INSTANT_TIME = "0";
+
+  private final HoodieTable table = Mockito.mock(HoodieTable.class);
+
+  private final SyncableFileSystemView fileSystemView = Mockito.mock(SyncableFileSystemView.class);
+
+  public static Stream<Arguments> generateTruthValues() {
+    int noOfVariables = 3;
+    int noOfRows = 1 << noOfVariables;
+    Object[][] truthValues = new Object[noOfRows][noOfVariables];
+    for (int i = 0; i < noOfRows; i++) {
+      for (int j = noOfVariables - 1; j >= 0; j--) {
+        boolean out = (i / (1 << j)) % 2 != 0;
+        truthValues[i][j] = out;
+      }
+    }
+    return Stream.of(truthValues).map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("generateTruthValues")
+  public void testDeletePartitionUtils(
+      boolean hasPendingCompactionOperations,
+      boolean hasPendingLogCompactionOperations,
+      boolean hasFileGroupsInPendingClustering) {
+    System.out.printf("hasPendingCompactionOperations: %s, hasPendingLogCompactionOperations: %s, hasFileGroupsInPendingClustering: %s%n",
+        hasPendingCompactionOperations, hasPendingLogCompactionOperations, hasFileGroupsInPendingClustering);
+    Mockito.when(table.getSliceView()).thenReturn(fileSystemView);
+    Mockito.when(fileSystemView.getPendingCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingCompactionOperations));
+    Mockito.when(fileSystemView.getPendingLogCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingLogCompactionOperations));
+    Mockito.when(fileSystemView.getFileGroupsInPendingClustering()).thenReturn(createFileGroupsInPendingClustering(hasFileGroupsInPendingClustering));
+
+    boolean shouldThrowException = hasPendingCompactionOperations || hasPendingLogCompactionOperations || hasFileGroupsInPendingClustering;
+
+    if (shouldThrowException) {
+      assertThrows(HoodieDeletePartitionException.class,
+          () -> DeletePartitionUtils.checkForPendingTableServiceActions(table,
+              Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION)));
+    } else {
+      assertDoesNotThrow(() -> DeletePartitionUtils.checkForPendingTableServiceActions(table,
+          Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION)));
+    }
+  }
+
+  private static Stream<Pair<String, CompactionOperation>> createPendingCompactionOperations(boolean hasPendingCompactionOperations) {
+    return Stream.of(Pair.of(HARDCODED_INSTANT_TIME, getCompactionOperation(hasPendingCompactionOperations)));
+  }
+
+  private static CompactionOperation getCompactionOperation(boolean hasPendingJobInPartition) {
+    return new CompactionOperation(
+        "fileId", getPartitionName(hasPendingJobInPartition), HARDCODED_INSTANT_TIME, Option.empty(),
+        new ArrayList<>(), Option.empty(), Option.empty(), new HashMap<>());
+  }
+
+  private static Stream<Pair<HoodieFileGroupId, HoodieInstant>> createFileGroupsInPendingClustering(boolean hasFileGroupsInPendingClustering) {
+    HoodieFileGroupId hoodieFileGroupId = new HoodieFileGroupId(getPartitionName(hasFileGroupsInPendingClustering), "fileId");
+    HoodieInstant hoodieInstant = new HoodieInstant(true, "replacecommit", HARDCODED_INSTANT_TIME);
+    return Stream.of(Pair.of(hoodieFileGroupId, hoodieInstant));
+  }
+
+  private static String getPartitionName(boolean hasPendingTableServiceAction) {
+    return hasPendingTableServiceAction ? PARTITION_IN_PENDING_SERVICE_ACTION : "unaffected_partition";
+  }
+
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
index a301ba228e4..3f19534d08c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.DeletePartitionUtils;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -62,6 +63,8 @@ public class FlinkDeletePartitionCommitActionExecutor<T extends HoodieRecordPayl
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions);
+
     try {
       HoodieTimer timer = new HoodieTimer().startTimer();
       context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
index 49134d604d2..b45a691fbad 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.DeletePartitionUtils;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -59,6 +60,8 @@ public class SparkDeletePartitionCommitActionExecutor<T>
 
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions);
+
     try {
       HoodieTimer timer = HoodieTimer.start();
       context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index 15b14ec77f5..02c558fc2f3 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -18,14 +18,17 @@
 package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
 import org.apache.hudi.common.model.HoodieCommitMetadata
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
+import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
 import org.apache.spark.sql.SaveMode
 import org.junit.jupiter.api.Assertions
+import org.junit.jupiter.api.Assertions.assertTrue
 
 class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
 
@@ -396,4 +399,128 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Prevent a partition from being dropped if there are pending CLUSTERING jobs") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | options (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+           | partitioned by(ts)
+           | location '$basePath'
+           | """.stripMargin)
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+      val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+
+      // Generate the first clustering plan
+      val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
+      client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
+
+      checkAnswer(s"call show_clustering('$tableName')")(
+        Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*")
+      )
+
+      val partition = "ts=1002"
+      val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
+      checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
+    }
+  }
+
+  test("Prevent a partition from being dropped if there are pending COMPACTs jobs") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+      // Using INMEMORY index type to ensure that deltacommits generate log files instead of parquet
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | options (
+           |  primaryKey ='id',
+           |  type = 'mor',
+           |  preCombineField = 'ts',
+           |  hoodie.index.type = 'INMEMORY'
+           | )
+           | partitioned by(ts)
+           | location '$basePath'
+           | """.stripMargin)
+      // Create 5 deltacommits to ensure that it is > default `hoodie.compact.inline.max.delta.commits`
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+      spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
+      spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
+      val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+
+      // Generate the first compaction plan
+      val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
+      assertTrue(client.scheduleCompactionAtInstant(firstScheduleInstant, HOption.empty()))
+
+      checkAnswer(s"call show_compaction('$tableName')")(
+        Seq(firstScheduleInstant, 5, HoodieInstant.State.REQUESTED.name())
+      )
+
+      val partition = "ts=1002"
+      val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
+      checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
+    }
+  }
+
+  test("Prevent a partition from being dropped if there are pending LOG_COMPACT jobs") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+      // Using INMEMORY index type to ensure that deltacommits generate log files instead of parquet
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | options (
+           |  primaryKey ='id',
+           |  type = 'mor',
+           |  preCombineField = 'ts',
+           |  hoodie.index.type = 'INMEMORY'
+           | )
+           | partitioned by(ts)
+           | location '$basePath'
+           | """.stripMargin)
+      // Create 5 deltacommits to ensure that it is > default `hoodie.compact.inline.max.delta.commits`
+      // Write everything into the same FileGroup but into separate blocks
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+      spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+      spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")
+      val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+
+      // Generate the first log_compaction plan
+      val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
+      assertTrue(client.scheduleLogCompactionAtInstant(firstScheduleInstant, HOption.empty()))
+
+      val partition = "ts=1000"
+      val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
+      checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
+    }
+  }
 }