You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/17 11:17:15 UTC
[flink-table-store] branch master updated: [FLINK-30711] Support dropping partition via Flink action
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new f503591b [FLINK-30711] Support dropping partition via Flink action
f503591b is described below
commit f503591b7f5ee77f876296110840c87d7672bb9b
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Tue Jan 17 19:17:09 2023 +0800
[FLINK-30711] Support dropping partition via Flink action
This closes #487
---
docs/content/docs/how-to/writing-tables.md | 54 ++++--
flink-table-store-connector/pom.xml | 8 +
.../flink/table/store/connector/action/Action.java | 131 +++++++++++++
.../store/connector/action/CompactAction.java | 74 +++-----
.../connector/action/DropPartitionAction.java | 115 ++++++++++++
.../table/store/connector/action/FlinkActions.java | 32 +---
.../store/connector/action/ActionITCaseBase.java | 111 +++++++++++
.../connector/action/CompactActionITCase.java | 202 +++++++--------------
.../connector/action/DropPartitionITCase.java | 176 ++++++++++++++++++
.../store/connector/util/AbstractTestBase.java | 3 +-
.../store/file/operation/FileStoreCommit.java | 21 ++-
.../store/file/operation/FileStoreCommitImpl.java | 42 +++--
.../flink/table/store/table/sink/TableCommit.java | 16 +-
.../table/store/tests/FlinkActionsE2eTest.java | 89 +++++++--
14 files changed, 817 insertions(+), 257 deletions(-)
diff --git a/docs/content/docs/how-to/writing-tables.md b/docs/content/docs/how-to/writing-tables.md
index db0e4661..135d7443 100644
--- a/docs/content/docs/how-to/writing-tables.md
+++ b/docs/content/docs/how-to/writing-tables.md
@@ -157,29 +157,23 @@ INSERT OVERWRITE MyTable SELECT * FROM MyTable WHERE false
{{< /tabs >}}
-## Purging a Partition
+## Purging Partitions
-Particularly, you can use `INSERT OVERWRITE` to purge data of a partition by inserting empty value to the partition:
+Currently, Table Store supports two ways to purge partitions.
-{{< tabs "purge-partition-syntax" >}}
+1. Like purging tables, you can use `INSERT OVERWRITE` to purge data of partitions by inserting empty value to them.
-{{< tab "Flink" >}}
-
-```sql
-INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM MyTable WHERE false
-```
+2. Method #1 dose not support to drop multiple partitions. In case that you need to drop multiple partitions, you can submit the drop-partition job through `flink run`.
-{{< /tab >}}
-
-{{< /tabs >}}
-
-The following SQL is an example:
-
-{{< tabs "purge-partition-example" >}}
+{{< tabs "purge-partitions-syntax" >}}
{{< tab "Flink" >}}
```sql
+-- Syntax
+INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM MyTable WHERE false
+
+-- The following SQL is an example:
-- table definition
CREATE TABLE MyTable (
k0 INT,
@@ -196,4 +190,34 @@ INSERT OVERWRITE MyTable PARTITION (k0 = 0, k1 = 0) SELECT v FROM MyTable WHERE
{{< /tab >}}
+{{< tab "Flink Job" >}}
+
+Run the following command to submit a drop-partition job for the table.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ -c org.apache.flink.table.store.connector.action.FlinkActions \
+ /path/to/flink-table-store-dist-{{< version >}}.jar \
+ drop-partition \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ --table <table-name>
+ --partition <partition_spec>
+ [--partition <partition_spec> ...]
+
+partition_spec:
+key1=value1,key2=value2...
+```
+
+For more information of drop-partition, see
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ -c org.apache.flink.table.store.connector.action.FlinkActions \
+ /path/to/flink-table-store-dist-{{< version >}}.jar \
+ drop-partition --help
+```
+
+{{< /tab >}}
+
{{< /tabs >}}
diff --git a/flink-table-store-connector/pom.xml b/flink-table-store-connector/pom.xml
index 5a8a2c2a..ac13d88d 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -83,6 +83,14 @@ under the License.
<!-- test dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-store-format</artifactId>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/Action.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/Action.java
new file mode 100644
index 00000000..59a66e97
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/Action.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Abstract class for Flink actions. */
+public interface Action {
+
+ /** The execution method of the action. */
+ void run() throws Exception;
+
+ @Nullable
+ static Path getTablePath(MultipleParameterTool params) {
+ String warehouse = params.get("warehouse");
+ String database = params.get("database");
+ String table = params.get("table");
+ String path = params.get("path");
+
+ Path tablePath = null;
+ int count = 0;
+ if (warehouse != null || database != null || table != null) {
+ if (warehouse == null || database == null || table == null) {
+ System.err.println(
+ "Warehouse, database and table must be specified all at once.\n"
+ + "Run <action> --help for help.");
+ return null;
+ }
+ tablePath = new Path(new Path(warehouse, database + ".db"), table);
+ count++;
+ }
+ if (path != null) {
+ tablePath = new Path(path);
+ count++;
+ }
+
+ if (count != 1) {
+ System.err.println(
+ "Please specify either \"warehouse, database and table\" or \"path\".\n"
+ + "Run <action> --help for help.");
+ return null;
+ }
+
+ return tablePath;
+ }
+
+ @Nullable
+ static List<Map<String, String>> getPartitions(MultipleParameterTool params) {
+ List<Map<String, String>> partitions = new ArrayList<>();
+ for (String partition : params.getMultiParameter("partition")) {
+ Map<String, String> kvs = new HashMap<>();
+ for (String kvString : partition.split(",")) {
+ String[] kv = kvString.split("=");
+ if (kv.length != 2) {
+ System.err.print(
+ "Invalid key-value pair \""
+ + kvString
+ + "\".\n"
+ + "Run <action> --help for help.");
+ return null;
+ }
+ kvs.put(kv[0], kv[1]);
+ }
+ partitions.add(kvs);
+ }
+
+ return partitions;
+ }
+
+ /** Factory to create {@link Action}. */
+ class Factory {
+
+ // supported actions
+ private static final String COMPACT = "compact";
+ private static final String DROP_PARTITION = "drop-partition";
+
+ public static Optional<Action> create(String[] args) {
+ String action = args[0].toLowerCase();
+ String[] actionArgs = Arrays.copyOfRange(args, 1, args.length);
+
+ switch (action) {
+ case COMPACT:
+ return CompactAction.create(actionArgs);
+ case DROP_PARTITION:
+ return DropPartitionAction.create(actionArgs);
+ default:
+ System.err.println("Unknown action \"" + action + "\"");
+ printHelp();
+ return Optional.empty();
+ }
+ }
+
+ static void printHelp() {
+ System.out.println("Usage: <action> [OPTIONS]");
+ System.out.println();
+
+ System.out.println("Available actions:");
+ System.out.println(" " + COMPACT);
+ System.out.println(" " + DROP_PARTITION);
+ System.out.println();
+
+ System.out.println("For detailed options of each action, run <action> --help");
+ }
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
index 947056ad..a87f7699 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
@@ -34,14 +34,20 @@ import org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUt
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static org.apache.flink.table.store.connector.action.Action.getPartitions;
+import static org.apache.flink.table.store.connector.action.Action.getTablePath;
+
/** Table compact action for Flink. */
-public class CompactAction {
+public class CompactAction implements Action {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompactAction.class);
private final CompactorSourceBuilder sourceBuilder;
private final CompactorSinkBuilder sinkBuilder;
@@ -79,68 +85,37 @@ public class CompactAction {
// Flink run methods
// ------------------------------------------------------------------------
- public static Optional<CompactAction> create(MultipleParameterTool params) {
+ public static Optional<Action> create(String[] args) {
+ LOG.info("Compact job args: {}", String.join(" ", args));
+
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
if (params.has("help")) {
printHelp();
return Optional.empty();
}
- String warehouse = params.get("warehouse");
- String database = params.get("database");
- String table = params.get("table");
- String path = params.get("path");
-
- Path tablePath = null;
- int count = 0;
- if (warehouse != null || database != null || table != null) {
- if (warehouse == null || database == null || table == null) {
- System.err.println(
- "Warehouse, database and table must be specified all at once.\n"
- + "Run compact --help for help.");
- return Optional.empty();
- }
- tablePath = new Path(new Path(warehouse, database + ".db"), table);
- count++;
- }
- if (path != null) {
- tablePath = new Path(path);
- count++;
- }
+ Path tablePath = getTablePath(params);
- if (count != 1) {
- System.err.println(
- "Please specify either \"warehouse, database and table\" or \"path\".\n"
- + "Run compact --help for help.");
+ if (tablePath == null) {
return Optional.empty();
}
CompactAction action = new CompactAction(tablePath);
if (params.has("partition")) {
- List<Map<String, String>> partitions = new ArrayList<>();
- for (String partition : params.getMultiParameter("partition")) {
- Map<String, String> kvs = new HashMap<>();
- for (String kvString : partition.split(",")) {
- String[] kv = kvString.split("=");
- if (kv.length != 2) {
- System.err.print(
- "Invalid key-value pair \""
- + kvString
- + "\".\n"
- + "Run compact --help for help.");
- return Optional.empty();
- }
- kvs.put(kv[0], kv[1]);
- }
- partitions.add(kvs);
+ List<Map<String, String>> partitions = getPartitions(params);
+ if (partitions == null) {
+ return Optional.empty();
}
+
action.withPartitions(partitions);
}
return Optional.of(action);
}
- public static void printHelp() {
+ private static void printHelp() {
System.out.println(
"Action \"compact\" runs a dedicated job for compacting specified table.");
System.out.println();
@@ -165,4 +140,11 @@ public class CompactAction {
" compact --warehouse hdfs:///path/to/warehouse --database test_db --table test_table "
+ "--partition dt=20221126,hh=08 --partition dt=20221127,hh=09");
}
+
+ @Override
+ public void run() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ build(env);
+ env.execute("Compact job");
+ }
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/DropPartitionAction.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/DropPartitionAction.java
new file mode 100644
index 00000000..11f58df3
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/DropPartitionAction.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.apache.flink.table.store.connector.action.Action.getPartitions;
+import static org.apache.flink.table.store.connector.action.Action.getTablePath;
+
+/** Table drop partition action for Flink. */
+public class DropPartitionAction implements Action {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompactAction.class);
+
+ private final TableCommit commit;
+
+ DropPartitionAction(Path tablePath, List<Map<String, String>> partitions) {
+ FileStoreTable table = FileStoreTableFactory.create(tablePath);
+ this.commit =
+ table.newCommit(UUID.randomUUID().toString()).withOverwritePartitions(partitions);
+ }
+
+ public static Optional<Action> create(String[] args) {
+ LOG.info("Drop partition job args: {}", String.join(" ", args));
+
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+ if (params.has("help")) {
+ printHelp();
+ return Optional.empty();
+ }
+
+ Path tablePath = getTablePath(params);
+
+ if (tablePath == null) {
+ return Optional.empty();
+ }
+
+ if (!params.has("partition")) {
+ LOG.info(
+ "Action drop-partition must specify partitions needed to be dropped.\n"
+ + "Run drop-partition --help for help.");
+ System.err.println(
+ "Action drop-partition must specify partitions needed to be dropped.\n"
+ + "Run drop-partition --help for help.");
+
+ return Optional.empty();
+ }
+
+ List<Map<String, String>> partitions = getPartitions(params);
+ if (partitions == null) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new DropPartitionAction(tablePath, partitions));
+ }
+
+ private static void printHelp() {
+ System.out.println(
+ "Action \"drop-partition\" drops data of specified partitions for a table.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " drop-partition --warehouse <warehouse-path> --database <database-name> "
+ + "--table <table-name> --partition <partition-name> [--partition <partition-name> ...]");
+ System.out.println(
+ " drop-partition --path <table-path> --partition <partition-name> [--partition <partition-name> ...]");
+ System.out.println();
+
+ System.out.println("Partition name syntax:");
+ System.out.println(" key1=value1,key2=value2,...");
+ System.out.println();
+
+ System.out.println("Examples:");
+ System.out.println(
+ " drop-partition --warehouse hdfs:///path/to/warehouse --database test_db --table test_table --partition dt=20221126,hh=08");
+ System.out.println(
+ " drop-partition --path hdfs:///path/to/warehouse/test_db.db/test_table --partition dt=20221126,hh=08 --partition dt=20221127,hh=09");
+ }
+
+ @Override
+ public void run() throws Exception {
+ commit.commit(new ArrayList<>());
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java
index ab907784..d41e8c91 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java
@@ -18,13 +18,12 @@
package org.apache.flink.table.store.connector.action;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import java.util.Arrays;
import java.util.Optional;
+import static org.apache.flink.table.store.connector.action.Action.Factory.printHelp;
+
/** Table maintenance actions for Flink. */
public class FlinkActions {
@@ -46,35 +45,12 @@ public class FlinkActions {
System.exit(1);
}
- String action = args[0].toLowerCase();
- if ("compact".equals(action)) {
- runCompact(Arrays.copyOfRange(args, 1, args.length));
- } else {
- System.err.println("Unknown action \"" + action + "\"");
- printHelp();
- System.exit(1);
- }
- }
+ Optional<Action> action = Action.Factory.create(args);
- private static void runCompact(String[] args) throws Exception {
- Optional<CompactAction> action = CompactAction.create(MultipleParameterTool.fromArgs(args));
if (action.isPresent()) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- action.get().build(env);
- env.execute("Compact job: " + String.join(" ", args));
+ action.get().run();
} else {
System.exit(1);
}
}
-
- private static void printHelp() {
- System.out.println("Usage: <action> [OPTIONS]");
- System.out.println();
-
- System.out.println("Available actions:");
- System.out.println(" compact");
- System.out.println();
-
- System.out.println("For detailed options of each action, run <action> --help");
- }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/ActionITCaseBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/ActionITCaseBase.java
new file mode 100644
index 00000000..1d25e2f8
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/ActionITCaseBase.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.connector.util.AbstractTestBase;
+import org.apache.flink.table.store.data.DataFormatTestUtil;
+import org.apache.flink.table.store.data.GenericRow;
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderUtils;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.types.RowType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/** {@link Action} test base. */
+public class ActionITCaseBase extends AbstractTestBase {
+
+ protected Path tablePath;
+ protected String commitUser;
+
+ protected SnapshotManager snapshotManager;
+ protected TableWrite write;
+ protected TableCommit commit;
+
+ private long incrementalIdentifier;
+
+ @BeforeEach
+ public void before() throws IOException {
+ tablePath = new Path(getTempDirPath());
+ commitUser = UUID.randomUUID().toString();
+ incrementalIdentifier = 0;
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ if (write != null) {
+ write.close();
+ }
+ if (commit != null) {
+ commit.close();
+ }
+ }
+
+ protected FileStoreTable createFileStoreTable(
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ Map<String, String> options)
+ throws Exception {
+ SchemaManager schemaManager = new SchemaManager(tablePath);
+ TableSchema tableSchema =
+ schemaManager.commitNewVersion(
+ new UpdateSchema(rowType, partitionKeys, primaryKeys, options, ""));
+ return FileStoreTableFactory.create(tablePath, tableSchema);
+ }
+
+ protected GenericRow rowData(Object... values) {
+ return GenericRow.of(values);
+ }
+
+ protected void writeData(GenericRow... data) throws Exception {
+ for (GenericRow d : data) {
+ write.write(d);
+ }
+ commit.commit(incrementalIdentifier, write.prepareCommit(true, incrementalIdentifier));
+ incrementalIdentifier++;
+ }
+
+ protected List<String> getResult(TableRead read, List<Split> splits, RowType rowType)
+ throws Exception {
+ RecordReader<InternalRow> recordReader = read.createReader(splits);
+ List<String> result = new ArrayList<>();
+ RecordReaderUtils.forEachRemaining(
+ recordReader, row -> result.add(DataFormatTestUtil.rowDataToString(row, rowType)));
+ return result;
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
index 996eb688..2d1c94a7 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
@@ -19,94 +19,69 @@
package org.apache.flink.table.store.connector.action;
import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.data.BinaryString;
-import org.apache.flink.table.store.data.GenericRow;
-import org.apache.flink.table.store.data.InternalRow;
import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.FileStoreTableFactory;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.Split;
-import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
import org.apache.flink.table.store.types.DataType;
import org.apache.flink.table.store.types.DataTypes;
import org.apache.flink.table.store.types.RowType;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
/** IT cases for {@link CompactAction}. */
-public class CompactActionITCase extends AbstractTestBase {
+public class CompactActionITCase extends ActionITCaseBase {
- private static final RowType ROW_TYPE =
- RowType.of(
- new DataType[] {
- DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()
- },
- new String[] {"k", "v", "hh", "dt"});
-
- private Path tablePath;
- private String commitUser;
+ private static final DataType[] FIELD_TYPES =
+ new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()};
- @Before
- public void before() throws IOException {
- tablePath = new Path(TEMPORARY_FOLDER.newFolder().toString());
- commitUser = UUID.randomUUID().toString();
- }
+ private static final RowType ROW_TYPE =
+ RowType.of(FIELD_TYPES, new String[] {"k", "v", "hh", "dt"});
- @Test(timeout = 60000)
+ @Test
+ @Timeout(60000)
public void testBatchCompact() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.WRITE_ONLY.key(), "true");
- FileStoreTable table = createFileStoreTable(options);
- SnapshotManager snapshotManager = table.snapshotManager();
- TableWrite write = table.newWrite(commitUser);
- TableCommit commit = table.newCommit(commitUser);
-
- write.write(rowData(1, 100, 15, BinaryString.fromString("20221208")));
- write.write(rowData(1, 100, 16, BinaryString.fromString("20221208")));
- write.write(rowData(1, 100, 15, BinaryString.fromString("20221209")));
- commit.commit(0, write.prepareCommit(true, 0));
-
- write.write(rowData(2, 200, 15, BinaryString.fromString("20221208")));
- write.write(rowData(2, 200, 16, BinaryString.fromString("20221208")));
- write.write(rowData(2, 200, 15, BinaryString.fromString("20221209")));
- commit.commit(1, write.prepareCommit(true, 1));
+ FileStoreTable table =
+ createFileStoreTable(
+ ROW_TYPE,
+ Arrays.asList("dt", "hh"),
+ Arrays.asList("dt", "hh", "k"),
+ options);
+ snapshotManager = table.snapshotManager();
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString("20221208")),
+ rowData(1, 100, 16, BinaryString.fromString("20221208")),
+ rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+ writeData(
+ rowData(2, 100, 15, BinaryString.fromString("20221208")),
+ rowData(2, 100, 16, BinaryString.fromString("20221208")),
+ rowData(2, 100, 15, BinaryString.fromString("20221209")));
Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
- Assert.assertEquals(2, snapshot.id());
- Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
-
- write.close();
- commit.close();
+ Assertions.assertEquals(2, snapshot.id());
+ Assertions.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -115,23 +90,24 @@ public class CompactActionITCase extends AbstractTestBase {
env.execute();
snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
- Assert.assertEquals(3, snapshot.id());
- Assert.assertEquals(Snapshot.CommitKind.COMPACT, snapshot.commitKind());
+ Assertions.assertEquals(3, snapshot.id());
+ Assertions.assertEquals(Snapshot.CommitKind.COMPACT, snapshot.commitKind());
DataTableScan.DataFilePlan plan = table.newScan().plan();
- Assert.assertEquals(3, plan.splits().size());
+ Assertions.assertEquals(3, plan.splits().size());
for (DataSplit split : plan.splits) {
if (split.partition().getInt(1) == 15) {
// compacted
- Assert.assertEquals(1, split.files().size());
+ Assertions.assertEquals(1, split.files().size());
} else {
// not compacted
- Assert.assertEquals(2, split.files().size());
+ Assertions.assertEquals(2, split.files().size());
}
}
}
- @Test(timeout = 60000)
+ @Test
+ @Timeout(60000)
public void testStreamingCompact() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
@@ -142,27 +118,32 @@ public class CompactActionITCase extends AbstractTestBase {
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
- FileStoreTable table = createFileStoreTable(options);
- SnapshotManager snapshotManager = table.snapshotManager();
- TableWrite write = table.newWrite(commitUser);
- TableCommit commit = table.newCommit(commitUser);
+ FileStoreTable table =
+ createFileStoreTable(
+ ROW_TYPE,
+ Arrays.asList("dt", "hh"),
+ Arrays.asList("dt", "hh", "k"),
+ options);
+ snapshotManager = table.snapshotManager();
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
// base records
- write.write(rowData(1, 100, 15, BinaryString.fromString("20221208")));
- write.write(rowData(1, 100, 16, BinaryString.fromString("20221208")));
- write.write(rowData(1, 100, 15, BinaryString.fromString("20221209")));
- commit.commit(0, write.prepareCommit(true, 0));
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString("20221208")),
+ rowData(1, 100, 16, BinaryString.fromString("20221208")),
+ rowData(1, 100, 15, BinaryString.fromString("20221209")));
Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
- Assert.assertEquals(1, snapshot.id());
- Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
+ Assertions.assertEquals(1, snapshot.id());
+ Assertions.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
// no full compaction has happened, so plan should be empty
SnapshotEnumerator snapshotEnumerator =
ContinuousDataFileSnapshotEnumerator.create(table, table.newScan(), null);
DataTableScan.DataFilePlan plan = snapshotEnumerator.enumerate();
- Assert.assertEquals(1, (long) plan.snapshotId);
- Assert.assertTrue(plan.splits().isEmpty());
+ Assertions.assertEquals(1, (long) plan.snapshotId);
+ Assertions.assertTrue(plan.splits().isEmpty());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
@@ -183,20 +164,18 @@ public class CompactActionITCase extends AbstractTestBase {
// first full compaction
List<String> actual = new ArrayList<>();
while (plan != null) {
- actual.addAll(getResult(table.newRead(), plan.splits()));
+ actual.addAll(getResult(table.newRead(), plan.splits(), ROW_TYPE));
plan = snapshotEnumerator.enumerate();
}
actual.sort(String::compareTo);
- Assert.assertEquals(Arrays.asList("+I 1|100|15|20221208", "+I 1|100|15|20221209"), actual);
+ Assertions.assertEquals(
+ Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 20221209]"), actual);
// incremental records
- write.write(rowData(1, 101, 15, BinaryString.fromString("20221208")));
- write.write(rowData(1, 101, 16, BinaryString.fromString("20221208")));
- write.write(rowData(1, 101, 15, BinaryString.fromString("20221209")));
- commit.commit(1, write.prepareCommit(true, 1));
-
- write.close();
- commit.close();
+ writeData(
+ rowData(1, 101, 15, BinaryString.fromString("20221208")),
+ rowData(1, 101, 16, BinaryString.fromString("20221208")),
+ rowData(1, 101, 15, BinaryString.fromString("20221209")));
while (true) {
plan = snapshotEnumerator.enumerate();
@@ -209,20 +188,20 @@ public class CompactActionITCase extends AbstractTestBase {
// second full compaction
actual = new ArrayList<>();
while (plan != null) {
- actual.addAll(getResult(table.newRead(), plan.splits()));
+ actual.addAll(getResult(table.newRead(), plan.splits(), ROW_TYPE));
plan = snapshotEnumerator.enumerate();
}
actual.sort(String::compareTo);
- Assert.assertEquals(
+ Assertions.assertEquals(
Arrays.asList(
- "+U 1|101|15|20221208",
- "+U 1|101|15|20221209",
- "-U 1|100|15|20221208",
- "-U 1|100|15|20221209"),
+ "+U[1, 101, 15, 20221208]",
+ "+U[1, 101, 15, 20221209]",
+ "-U[1, 100, 15, 20221208]",
+ "-U[1, 100, 15, 20221209]"),
actual);
// assert dedicated compact job will expire snapshots
- Assert.assertEquals(
+ Assertions.assertEquals(
snapshotManager.latestSnapshotId() - 2,
(long) snapshotManager.earliestSnapshotId());
}
@@ -238,47 +217,4 @@ public class CompactActionITCase extends AbstractTestBase {
return Arrays.asList(partition1, partition2);
}
-
- private GenericRow rowData(Object... values) {
- return GenericRow.of(values);
- }
-
- private List<String> getResult(TableRead read, List<Split> splits) throws Exception {
- List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new ArrayList<>();
- for (Split split : splits) {
- readers.add(() -> read.createReader(split));
- }
- RecordReader<InternalRow> recordReader = ConcatRecordReader.create(readers);
- RecordReaderIterator<InternalRow> iterator = new RecordReaderIterator<>(recordReader);
- List<String> result = new ArrayList<>();
- while (iterator.hasNext()) {
- InternalRow rowData = iterator.next();
- result.add(rowDataToString(rowData));
- }
- iterator.close();
- return result;
- }
-
- private String rowDataToString(InternalRow rowData) {
- return String.format(
- "%s %d|%d|%d|%s",
- rowData.getRowKind().shortString(),
- rowData.getInt(0),
- rowData.getInt(1),
- rowData.getInt(2),
- rowData.getString(3).toString());
- }
-
- private FileStoreTable createFileStoreTable(Map<String, String> options) throws Exception {
- SchemaManager schemaManager = new SchemaManager(tablePath);
- TableSchema tableSchema =
- schemaManager.commitNewVersion(
- new UpdateSchema(
- ROW_TYPE,
- Arrays.asList("dt", "hh"),
- Arrays.asList("dt", "hh", "k"),
- options,
- ""));
- return FileStoreTableFactory.create(tablePath, tableSchema);
- }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java
new file mode 100644
index 00000000..60491b7b
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.table.store.data.BinaryString;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.table.store.types.DataTypes;
+import org.apache.flink.table.store.types.RowType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link DropPartitionAction}. */
+public class DropPartitionITCase extends ActionITCaseBase {
+
+ private static final DataType[] FIELD_TYPES =
+ new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()};
+
+ private static final RowType ROW_TYPE =
+ RowType.of(FIELD_TYPES, new String[] {"partKey0", "partKey1", "dt", "value"});
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDropPartitionWithSinglePartitionKey(boolean hasPk) throws Exception {
+ FileStoreTable table = prepareTable(hasPk);
+
+ new DropPartitionAction(
+ tablePath,
+ Collections.singletonList(Collections.singletonMap("partKey0", "0")))
+ .run();
+
+ Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(5);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+
+ DataTableScan.DataFilePlan plan = table.newScan().plan();
+ assertThat(plan.splits().size()).isEqualTo(2);
+ List<String> actual = getResult(table.newRead(), plan.splits(), ROW_TYPE);
+
+ List<String> expected;
+ if (hasPk) {
+ expected =
+ Arrays.asList(
+ "+I[1, 0, 2023-01-17, 5]",
+ "+I[1, 1, 2023-01-18, 82]",
+ "+I[1, 1, 2023-01-19, 90]",
+ "+I[1, 1, 2023-01-20, 97]");
+ } else {
+ expected =
+ Arrays.asList(
+ "+I[1, 0, 2023-01-17, 2]",
+ "+I[1, 0, 2023-01-17, 3]",
+ "+I[1, 0, 2023-01-17, 5]",
+ "+I[1, 1, 2023-01-18, 82]",
+ "+I[1, 1, 2023-01-19, 90]",
+ "+I[1, 1, 2023-01-20, 97]");
+ }
+
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDropPartitionWithMultiplePartitionKey(boolean hasPk) throws Exception {
+ FileStoreTable table = prepareTable(hasPk);
+
+ Map<String, String> partitions0 = new HashMap<>();
+ partitions0.put("partKey0", "0");
+ partitions0.put("partKey1", "1");
+
+ Map<String, String> partitions1 = new HashMap<>();
+ partitions1.put("partKey0", "1");
+ partitions1.put("partKey1", "0");
+
+ new DropPartitionAction(tablePath, Arrays.asList(partitions0, partitions1)).run();
+
+ Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(5);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+
+ DataTableScan.DataFilePlan plan = table.newScan().plan();
+ assertThat(plan.splits().size()).isEqualTo(2);
+ List<String> actual = getResult(table.newRead(), plan.splits(), ROW_TYPE);
+
+ List<String> expected;
+ if (hasPk) {
+ expected =
+ Arrays.asList(
+ "+I[0, 0, 2023-01-12, 102]",
+ "+I[0, 0, 2023-01-13, 103]",
+ "+I[1, 1, 2023-01-18, 82]",
+ "+I[1, 1, 2023-01-19, 90]",
+ "+I[1, 1, 2023-01-20, 97]");
+ } else {
+ expected =
+ Arrays.asList(
+ "+I[0, 0, 2023-01-12, 101]",
+ "+I[0, 0, 2023-01-12, 102]",
+ "+I[0, 0, 2023-01-13, 103]",
+ "+I[1, 1, 2023-01-18, 82]",
+ "+I[1, 1, 2023-01-19, 90]",
+ "+I[1, 1, 2023-01-20, 97]");
+ }
+
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private FileStoreTable prepareTable(boolean hasPk) throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ ROW_TYPE,
+ Arrays.asList("partKey0", "partKey1"),
+ hasPk
+ ? Arrays.asList("partKey0", "partKey1", "dt")
+ : Collections.emptyList(),
+ new HashMap<>());
+ snapshotManager = table.snapshotManager();
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+
+ // prepare data
+ writeData(
+ rowData(0, 0, BinaryString.fromString("2023-01-12"), 101),
+ rowData(0, 0, BinaryString.fromString("2023-01-12"), 102),
+ rowData(0, 0, BinaryString.fromString("2023-01-13"), 103));
+
+ writeData(
+ rowData(0, 1, BinaryString.fromString("2023-01-14"), 110),
+ rowData(0, 1, BinaryString.fromString("2023-01-15"), 120),
+ rowData(0, 1, BinaryString.fromString("2023-01-16"), 130));
+
+ writeData(
+ rowData(1, 0, BinaryString.fromString("2023-01-17"), 2),
+ rowData(1, 0, BinaryString.fromString("2023-01-17"), 3),
+ rowData(1, 0, BinaryString.fromString("2023-01-17"), 5));
+
+ writeData(
+ rowData(1, 1, BinaryString.fromString("2023-01-18"), 82),
+ rowData(1, 1, BinaryString.fromString("2023-01-19"), 90),
+ rowData(1, 1, BinaryString.fromString("2023-01-20"), 97));
+
+ Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+
+ assertThat(snapshot.id()).isEqualTo(4);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+
+ return table;
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/util/AbstractTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/util/AbstractTestBase.java
index 8cded1e1..e127f913 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/util/AbstractTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/util/AbstractTestBase.java
@@ -36,8 +36,7 @@ import java.util.UUID;
/** Similar to Flink's AbstractTestBase but using Junit5. */
public class AbstractTestBase extends TestLogger {
- private static final Logger LOG =
- LoggerFactory.getLogger(org.apache.flink.test.util.AbstractTestBase.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class);
private static final int DEFAULT_PARALLELISM = 4;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
index c2c0d8c5..aa5009a6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -37,16 +38,24 @@ public interface FileStoreCommit {
/** Commit from manifest committable. */
void commit(ManifestCommittable committable, Map<String, String> properties);
+ /** Overwrite a single partition from manifest committable. */
+ default void overwrite(
+ Map<String, String> partition,
+ ManifestCommittable committable,
+ Map<String, String> properties) {
+ overwrite(Collections.singletonList(partition), committable, properties);
+ }
+
/**
- * Overwrite from manifest committable and partition.
+ * Overwrite multiple partitions from manifest committable.
*
- * @param partition A single partition maps each partition key to a partition value. Depending
- * on the user-defined statement, the partition might not include all partition keys. Also
- * note that this partition does not necessarily equal to the partitions of the newly added
- * key-values. This is just the partition to be cleaned up.
+ * @param partitions A list of partition {@link Map}s that maps each partition key to a
+ * partition value. Depending on the user-defined statement, the partition might not include
+ * all partition keys. Also note that this partition does not necessarily equal to the
+ * partitions of the newly added key-values. This is just the partition to be cleaned up.
*/
void overwrite(
- Map<String, String> partition,
+ List<Map<String, String>> partitions,
ManifestCommittable committable,
Map<String, String> properties);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 4c0d1b8f..0d08bb13 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -240,13 +240,15 @@ public class FileStoreCommitImpl implements FileStoreCommit {
@Override
public void overwrite(
- Map<String, String> partition,
+ List<Map<String, String>> partitions,
ManifestCommittable committable,
Map<String, String> properties) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Ready to overwrite partition "
- + partition.toString()
+ + partitions.stream()
+ .map(Object::toString)
+ .collect(Collectors.joining(","))
+ "\n"
+ committable.toString());
}
@@ -279,19 +281,35 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
// sanity check, all changes must be done within the given partition
- Predicate partitionFilter = PredicateBuilder.partition(partition, partitionType);
- if (partitionFilter != null) {
- for (ManifestEntry entry : appendTableFiles) {
- if (!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
- throw new IllegalArgumentException(
- "Trying to overwrite partition "
- + partition
- + ", but the changes in "
- + pathFactory.getPartitionString(entry.partition())
- + " does not belong to this partition");
+ List<Predicate> partitionFilters = new ArrayList<>();
+ for (Map<String, String> partition : partitions) {
+ Predicate partitionFilter = PredicateBuilder.partition(partition, partitionType);
+ if (partitionFilter != null) {
+ for (ManifestEntry entry : appendTableFiles) {
+ if (!partitionFilter.test(
+ partitionObjectConverter.convert(entry.partition()))) {
+ throw new IllegalArgumentException(
+ "Trying to overwrite partition "
+ + partition
+ + ", but the changes in "
+ + pathFactory.getPartitionString(entry.partition())
+ + " does not belong to this partition");
+ }
}
+
+ partitionFilters.add(partitionFilter);
}
}
+
+ Predicate partitionFilter;
+ if (partitionFilters.size() == 0) {
+ partitionFilter = null;
+ } else if (partitionFilters.size() == 1) {
+ partitionFilter = partitionFilters.get(0);
+ } else {
+ partitionFilter = PredicateBuilder.or(partitionFilters);
+ }
+
// overwrite new files
tryOverwrite(
partitionFilter,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
index dd6056d5..56e5905a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
@@ -39,7 +39,7 @@ public class TableCommit implements AutoCloseable {
private final FileStoreCommit commit;
@Nullable private final FileStoreExpire expire;
- @Nullable private Map<String, String> overwritePartition = null;
+ @Nullable private List<Map<String, String>> overwritePartitions = null;
@Nullable private Lock lock;
public TableCommit(FileStoreCommit commit, @Nullable FileStoreExpire expire) {
@@ -48,7 +48,15 @@ public class TableCommit implements AutoCloseable {
}
public TableCommit withOverwritePartition(@Nullable Map<String, String> overwritePartition) {
- this.overwritePartition = overwritePartition;
+ if (overwritePartition != null) {
+ this.overwritePartitions = Collections.singletonList(overwritePartition);
+ }
+ return this;
+ }
+
+ public TableCommit withOverwritePartitions(
+ @Nullable List<Map<String, String>> overwritePartitions) {
+ this.overwritePartitions = overwritePartitions;
return this;
}
@@ -81,7 +89,7 @@ public class TableCommit implements AutoCloseable {
}
public void commit(List<ManifestCommittable> committables) {
- if (overwritePartition == null) {
+ if (overwritePartitions == null) {
for (ManifestCommittable committable : committables) {
commit.commit(committable, new HashMap<>());
}
@@ -99,7 +107,7 @@ public class TableCommit implements AutoCloseable {
// TODO maybe it can be produced by CommitterOperator
committable = new ManifestCommittable(Long.MAX_VALUE);
}
- commit.overwrite(overwritePartition, committable, new HashMap<>());
+ commit.overwrite(overwritePartitions, committable, new HashMap<>());
}
if (expire != null) {
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
index 956d2051..308474df 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.tests;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +35,24 @@ public class FlinkActionsE2eTest extends E2eTestBase {
super(true, false);
}
+ private String warehousePath;
+ private String catalogDdl;
+ private String useCatalogCmd;
+
+ @BeforeEach
+ public void setUp() {
+ warehousePath = TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store";
+ catalogDdl =
+ String.format(
+ "CREATE CATALOG ts_catalog WITH (\n"
+ + " 'type' = 'table-store',\n"
+ + " 'warehouse' = '%s'\n"
+ + ");",
+ warehousePath);
+
+ useCatalogCmd = "USE CATALOG ts_catalog;";
+ }
+
@Test
public void testCompact() throws Exception {
String topicName = "ts-topic-" + UUID.randomUUID();
@@ -57,17 +76,6 @@ public class FlinkActionsE2eTest extends E2eTestBase {
+ ");",
topicName);
- String warehousePath = TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store";
- String catalogDdl =
- String.format(
- "CREATE CATALOG ts_catalog WITH (\n"
- + " 'type' = 'table-store',\n"
- + " 'warehouse' = '%s'\n"
- + ");",
- warehousePath);
-
- String useCatalogCmd = "USE CATALOG ts_catalog;";
-
String tableDdl =
"CREATE TABLE IF NOT EXISTS ts_table (\n"
+ " dt STRING,\n"
@@ -132,6 +140,65 @@ public class FlinkActionsE2eTest extends E2eTestBase {
checkResult("20221205, 1, 101", "20221206, 1, 101");
}
+ @Test
+ public void testDropPartition() throws Exception {
+ String tableDdl =
+ "CREATE TABLE IF NOT EXISTS ts_table (\n"
+ + " dt STRING,\n"
+ + " k0 INT,\n"
+ + " k1 INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (dt, k0, k1) NOT ENFORCED\n"
+ + ") PARTITIONED BY (k0, k1);";
+
+ String insert =
+ "INSERT INTO ts_table VALUES ('2023-01-13', 0, 0, 15), ('2023-01-14', 0, 0, 19), ('2023-01-13', 0, 0, 39), "
+ + "('2023-01-15', 0, 1, 34), ('2023-01-15', 0, 1, 56), ('2023-01-15', 0, 1, 37), "
+ + "('2023-01-16', 1, 0, 25), ('2023-01-17', 1, 0, 50), ('2023-01-18', 1, 0, 75), "
+ + "('2023-01-19', 1, 1, 23), ('2023-01-20', 1, 1, 28), ('2023-01-21', 1, 1, 31);";
+
+ runSql("SET 'table.dml-sync' = 'true';\n" + insert, catalogDdl, useCatalogCmd, tableDdl);
+
+ // run drop partition job
+ Container.ExecResult execResult =
+ jobManager.execInContainer(
+ "bin/flink",
+ "run",
+ "-c",
+ "org.apache.flink.table.store.connector.action.FlinkActions",
+ "--detached",
+ "lib/flink-table-store.jar",
+ "drop-partition",
+ "--warehouse",
+ warehousePath,
+ "--database",
+ "default",
+ "--table",
+ "ts_table",
+ "--partition",
+ "k0=0,k1=1",
+ "--partition",
+ "k0=1,k1=0");
+ LOG.info(execResult.getStdout());
+ LOG.info(execResult.getStderr());
+
+ // read all data from table store
+ runSql(
+ "INSERT INTO result1 SELECT * FROM ts_table;",
+ catalogDdl,
+ useCatalogCmd,
+ tableDdl,
+ createResultSink("result1", "dt STRING, k0 INT, k1 INT, v INT"));
+
+ // check the left data
+ checkResult(
+ "2023-01-13, 0, 0, 39",
+ "2023-01-14, 0, 0, 19",
+ "2023-01-19, 1, 1, 23",
+ "2023-01-20, 1, 1, 28",
+ "2023-01-21, 1, 1, 31");
+ }
+
private void runSql(String sql, String... ddls) throws Exception {
runSql(String.join("\n", ddls) + "\n" + sql);
}