You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/31 11:19:02 UTC
[hudi] branch master updated: [HUDI-5553] Prevent partition(s) from being dropped if there are pending… (#7669)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d8576933a9b [HUDI-5553] Prevent partition(s) from being dropped if there are pending… (#7669)
d8576933a9b is described below
commit d8576933a9b30596a76819e7385de78cc25afe5b
Author: voonhous <vo...@gmail.com>
AuthorDate: Tue Jan 31 19:18:55 2023 +0800
[HUDI-5553] Prevent partition(s) from being dropped if there are pending… (#7669)
---
.../hudi/client/utils/DeletePartitionUtils.java | 77 ++++++++++++
.../client/utils/TestDeletePartitionUtils.java | 110 ++++++++++++++++++
.../FlinkDeletePartitionCommitActionExecutor.java | 3 +
.../SparkDeletePartitionCommitActionExecutor.java | 3 +
.../sql/hudi/TestAlterTableDropPartition.scala | 129 ++++++++++++++++++++-
5 files changed, 321 insertions(+), 1 deletion(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
new file mode 100644
index 00000000000..92c2065457e
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.exception.HoodieDeletePartitionException;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A utility class for helper functions when performing a delete partition operation.
+ */
+public class DeletePartitionUtils {
+
+ /**
+ * Check if there are any pending table service actions (requested + inflight) on a table affecting the partitions to
+ * be dropped.
+ * <p>
+ * This check is to prevent a drop-partition from proceeding should a partition have a table service action in
+ * the pending stage. If this is allowed to happen, the filegroup that is an input for a table service action, might
+ * also be a candidate for being replaced. As such, when the table service action and drop-partition commits are
+ * committed, there will be two commits replacing a single filegroup.
+ * <p>
+ * For example, a timeline might have an execution order as such:
+ * 000.replacecommit.requested (clustering filegroup_1 + filegroup_2 -> filegroup_3)
+ * 001.replacecommit.requested, 001.replacecommit.inflight, 0001.replacecommit (drop_partition to replace filegroup_1)
+ * 000.replacecommit.inflight (clustering is executed now)
+ * 000.replacecommit (clustering completed)
+ * For an execution order as shown above, 000.replacecommit and 001.replacecommit will both flag filegroup_1 to be replaced.
+ * This will cause downstream duplicate key errors when a map is being constructed.
+ *
+ * @param table Table to perform validation on
+ * @param partitionsToDrop List of partitions to drop
+ */
+ public static void checkForPendingTableServiceActions(HoodieTable table, List<String> partitionsToDrop) {
+ List<String> instantsOfOffendingPendingTableServiceAction = new ArrayList<>();
+ // ensure that there are no pending inflight clustering/compaction operations involving this partition
+ SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();
+
+ // separating the iteration of pending compaction operations from clustering as they return different stream types
+ Stream.concat(fileSystemView.getPendingCompactionOperations(), fileSystemView.getPendingLogCompactionOperations())
+ .filter(op -> partitionsToDrop.contains(op.getRight().getPartitionPath()))
+ .forEach(op -> instantsOfOffendingPendingTableServiceAction.add(op.getLeft()));
+
+ fileSystemView.getFileGroupsInPendingClustering()
+ .filter(fgIdInstantPair -> partitionsToDrop.contains(fgIdInstantPair.getLeft().getPartitionPath()))
+ .forEach(x -> instantsOfOffendingPendingTableServiceAction.add(x.getRight().getTimestamp()));
+
+ if (instantsOfOffendingPendingTableServiceAction.size() > 0) {
+ throw new HoodieDeletePartitionException("Failed to drop partitions. "
+ + "Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: " + partitionsToDrop + ". "
+ + "Instant(s) of offending pending table service action: "
+ + instantsOfOffendingPendingTableServiceAction.stream().distinct().collect(Collectors.toList()));
+ }
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java
new file mode 100644
index 00000000000..3a1632737ef
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieDeletePartitionException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestDeletePartitionUtils {
+
+ private static final String PARTITION_IN_PENDING_SERVICE_ACTION = "partition_with_pending_table_service_action";
+ private static final String HARDCODED_INSTANT_TIME = "0";
+
+ private final HoodieTable table = Mockito.mock(HoodieTable.class);
+
+ private final SyncableFileSystemView fileSystemView = Mockito.mock(SyncableFileSystemView.class);
+
+ public static Stream<Arguments> generateTruthValues() {
+ int noOfVariables = 3;
+ int noOfRows = 1 << noOfVariables;
+ Object[][] truthValues = new Object[noOfRows][noOfVariables];
+ for (int i = 0; i < noOfRows; i++) {
+ for (int j = noOfVariables - 1; j >= 0; j--) {
+ boolean out = (i / (1 << j)) % 2 != 0;
+ truthValues[i][j] = out;
+ }
+ }
+ return Stream.of(truthValues).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("generateTruthValues")
+ public void testDeletePartitionUtils(
+ boolean hasPendingCompactionOperations,
+ boolean hasPendingLogCompactionOperations,
+ boolean hasFileGroupsInPendingClustering) {
+ System.out.printf("hasPendingCompactionOperations: %s, hasPendingLogCompactionOperations: %s, hasFileGroupsInPendingClustering: %s%n",
+ hasPendingCompactionOperations, hasPendingLogCompactionOperations, hasFileGroupsInPendingClustering);
+ Mockito.when(table.getSliceView()).thenReturn(fileSystemView);
+ Mockito.when(fileSystemView.getPendingCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingCompactionOperations));
+ Mockito.when(fileSystemView.getPendingLogCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingLogCompactionOperations));
+ Mockito.when(fileSystemView.getFileGroupsInPendingClustering()).thenReturn(createFileGroupsInPendingClustering(hasFileGroupsInPendingClustering));
+
+ boolean shouldThrowException = hasPendingCompactionOperations || hasPendingLogCompactionOperations || hasFileGroupsInPendingClustering;
+
+ if (shouldThrowException) {
+ assertThrows(HoodieDeletePartitionException.class,
+ () -> DeletePartitionUtils.checkForPendingTableServiceActions(table,
+ Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION)));
+ } else {
+ assertDoesNotThrow(() -> DeletePartitionUtils.checkForPendingTableServiceActions(table,
+ Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION)));
+ }
+ }
+
+ private static Stream<Pair<String, CompactionOperation>> createPendingCompactionOperations(boolean hasPendingCompactionOperations) {
+ return Stream.of(Pair.of(HARDCODED_INSTANT_TIME, getCompactionOperation(hasPendingCompactionOperations)));
+ }
+
+ private static CompactionOperation getCompactionOperation(boolean hasPendingJobInPartition) {
+ return new CompactionOperation(
+ "fileId", getPartitionName(hasPendingJobInPartition), HARDCODED_INSTANT_TIME, Option.empty(),
+ new ArrayList<>(), Option.empty(), Option.empty(), new HashMap<>());
+ }
+
+ private static Stream<Pair<HoodieFileGroupId, HoodieInstant>> createFileGroupsInPendingClustering(boolean hasFileGroupsInPendingClustering) {
+ HoodieFileGroupId hoodieFileGroupId = new HoodieFileGroupId(getPartitionName(hasFileGroupsInPendingClustering), "fileId");
+ HoodieInstant hoodieInstant = new HoodieInstant(true, "replacecommit", HARDCODED_INSTANT_TIME);
+ return Stream.of(Pair.of(hoodieFileGroupId, hoodieInstant));
+ }
+
+ private static String getPartitionName(boolean hasPendingTableServiceAction) {
+ return hasPendingTableServiceAction ? PARTITION_IN_PENDING_SERVICE_ACTION : "unaffected_partition";
+ }
+
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
index a301ba228e4..3f19534d08c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.DeletePartitionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -62,6 +63,8 @@ public class FlinkDeletePartitionCommitActionExecutor<T extends HoodieRecordPayl
@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions);
+
try {
HoodieTimer timer = new HoodieTimer().startTimer();
context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
index 49134d604d2..b45a691fbad 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.DeletePartitionUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.WriteOperationType;
@@ -59,6 +60,8 @@ public class SparkDeletePartitionCommitActionExecutor<T>
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions);
+
try {
HoodieTimer timer = HoodieTimer.start();
context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index 15b14ec77f5..02c558fc2f3 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -18,14 +18,17 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
+import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
import org.apache.spark.sql.SaveMode
import org.junit.jupiter.api.Assertions
+import org.junit.jupiter.api.Assertions.assertTrue
class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
@@ -396,4 +399,128 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
}
}
}
+
+ test("Prevent a partition from being dropped if there are pending CLUSTERING jobs") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
+ | """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+ val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+
+ // Generate the first clustering plan
+ val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
+ client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
+
+ checkAnswer(s"call show_clustering('$tableName')")(
+ Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*")
+ )
+
+ val partition = "ts=1002"
+ val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
+ checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
+ }
+ }
+
+ test("Prevent a partition from being dropped if there are pending COMPACTs jobs") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+ // Using INMEMORY index type to ensure that deltacommits generate log files instead of parquet
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = 'mor',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'INMEMORY'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
+ | """.stripMargin)
+ // Create 5 deltacommits to ensure that it is > default `hoodie.compact.inline.max.delta.commits`
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
+ spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
+ val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+
+ // Generate the first compaction plan
+ val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
+ assertTrue(client.scheduleCompactionAtInstant(firstScheduleInstant, HOption.empty()))
+
+ checkAnswer(s"call show_compaction('$tableName')")(
+ Seq(firstScheduleInstant, 5, HoodieInstant.State.REQUESTED.name())
+ )
+
+ val partition = "ts=1002"
+ val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
+ checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
+ }
+ }
+
+ test("Prevent a partition from being dropped if there are pending LOG_COMPACT jobs") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+ // Using INMEMORY index type to ensure that deltacommits generate log files instead of parquet
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = 'mor',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'INMEMORY'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
+ | """.stripMargin)
+ // Create 5 deltacommits to ensure that it is > default `hoodie.compact.inline.max.delta.commits`
+ // Write everything into the same FileGroup but into separate blocks
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+ spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")
+ val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+
+ // Generate the first log_compaction plan
+ val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
+ assertTrue(client.scheduleLogCompactionAtInstant(firstScheduleInstant, HOption.empty()))
+
+ val partition = "ts=1000"
+ val errMsg = s"Failed to drop partitions. Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: [$partition]"
+ checkExceptionContain(s"ALTER TABLE $tableName DROP PARTITION($partition)")(errMsg)
+ }
+ }
}