You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/19 02:45:51 UTC
[flink-table-store] branch master updated: [FLINK-30212] Introduce a TableStoreCompactJob class for users to submit compact jobs in Table Store
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 15af711a [FLINK-30212] Introduce a TableStoreCompactJob class for users to submit compact jobs in Table Store
15af711a is described below
commit 15af711a1bf50b2d612b439a27225c8ad42acb1d
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Dec 19 10:45:47 2022 +0800
[FLINK-30212] Introduce a TableStoreCompactJob class for users to submit compact jobs in Table Store
This closes #437.
---
.../store/connector/action/CompactAction.java | 168 +++++++++++++
.../table/store/connector/action/FlinkActions.java | 80 ++++++
.../table/store/connector/sink/FlinkSink.java | 3 +-
.../sink/FullChangelogStoreSinkWrite.java | 25 +-
.../ChangelogWithKeyFileStoreTableITCase.java | 51 ++++
.../connector/action/CompactActionITCase.java | 269 +++++++++++++++++++++
.../table/store/table/FileDataFilterTestBase.java | 1 -
.../flink/table/store/tests/E2eTestBase.java | 2 +-
.../table/store/tests/FlinkActionsE2eTest.java | 138 +++++++++++
9 files changed, 724 insertions(+), 13 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
new file mode 100644
index 00000000..7b31df19
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.connector.sink.CompactorSinkBuilder;
+import org.apache.flink.table.store.connector.source.CompactorSourceBuilder;
+import org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Table compact action for Flink. */
+public class CompactAction {
+
+ private final CompactorSourceBuilder sourceBuilder;
+ private final CompactorSinkBuilder sinkBuilder;
+
+ CompactAction(Path tablePath) {
+ Configuration tableOptions = new Configuration();
+ tableOptions.set(CoreOptions.PATH, tablePath.toString());
+ tableOptions.set(CoreOptions.WRITE_COMPACTION_SKIP, false);
+ FileStoreTable table = FileStoreTableFactory.create(tableOptions);
+
+ sourceBuilder = new CompactorSourceBuilder(tablePath.toString(), table);
+ sinkBuilder = new CompactorSinkBuilder(table);
+ }
+
+ // ------------------------------------------------------------------------
+ // Java API
+ // ------------------------------------------------------------------------
+
+ public CompactAction withPartitions(List<Map<String, String>> partitions) {
+ sourceBuilder.withPartitions(partitions);
+ return this;
+ }
+
+ public void build(StreamExecutionEnvironment env) {
+ ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
+ boolean isStreaming =
+ conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
+
+ DataStreamSource<RowData> source =
+ sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
+ sinkBuilder.withInput(source).build();
+ }
+
+ // ------------------------------------------------------------------------
+ // Flink run methods
+ // ------------------------------------------------------------------------
+
+ public static Optional<CompactAction> create(MultipleParameterTool params) {
+ if (params.has("help")) {
+ printHelp();
+ return Optional.empty();
+ }
+
+ String warehouse = params.get("warehouse");
+ String database = params.get("database");
+ String table = params.get("table");
+ String path = params.get("path");
+
+ Path tablePath = null;
+ int count = 0;
+ if (warehouse != null || database != null || table != null) {
+ if (warehouse == null || database == null || table == null) {
+ System.err.println(
+ "Warehouse, database and table must be specified all at once.\n"
+ + "Run compact --help for help.");
+ return Optional.empty();
+ }
+ tablePath = new Path(new Path(warehouse, database + ".db"), table);
+ count++;
+ }
+ if (path != null) {
+ tablePath = new Path(path);
+ count++;
+ }
+
+ if (count != 1) {
+ System.err.println(
+ "Please specify either \"warehouse, database and table\" or \"path\".\n"
+ + "Run compact --help for help.");
+ return Optional.empty();
+ }
+
+ CompactAction action = new CompactAction(tablePath);
+
+ if (params.has("partition")) {
+ List<Map<String, String>> partitions = new ArrayList<>();
+ for (String partition : params.getMultiParameter("partition")) {
+ Map<String, String> kvs = new HashMap<>();
+ for (String kvString : partition.split(",")) {
+ String[] kv = kvString.split("=");
+ if (kv.length != 2) {
+ System.err.print(
+ "Invalid key-value pair \""
+ + kvString
+ + "\".\n"
+ + "Run compact --help for help.");
+ return Optional.empty();
+ }
+ kvs.put(kv[0], kv[1]);
+ }
+ partitions.add(kvs);
+ }
+ action.withPartitions(partitions);
+ }
+
+ return Optional.of(action);
+ }
+
+ public static void printHelp() {
+ System.out.println(
+ "Action \"compact\" runs a dedicated job for compacting specified table.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " compact --warehouse <warehouse-path> --database <database-name> "
+ + "--table <table-name> [--partition <partition-name>]");
+ System.out.println(" compact --path <table-path> [--partition <partition-name>]");
+ System.out.println();
+
+ System.out.println("Partition name syntax:");
+ System.out.println(" key1=value1,key2=value2,...");
+ System.out.println();
+
+ System.out.println("Examples:");
+ System.out.println(
+ " compact --warehouse hdfs:///path/to/warehouse --database test_db --table test_table");
+ System.out.println(
+ " compact --path hdfs:///path/to/warehouse/test_db.db/test_table --partition dt=20221126,hh=08");
+ System.out.println(
+ " compact --warehouse hdfs:///path/to/warehouse --database test_db --table test_table "
+ + "--partition dt=20221126,hh=08 --partition dt=20221127,hh=09");
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java
new file mode 100644
index 00000000..ab907784
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+/** Table maintenance actions for Flink. */
+public class FlinkActions {
+
+ // ------------------------------------------------------------------------
+ // Java API
+ // ------------------------------------------------------------------------
+
+ public static CompactAction compact(Path tablePath) {
+ return new CompactAction(tablePath);
+ }
+
+ // ------------------------------------------------------------------------
+ // Flink run methods
+ // ------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 1) {
+ printHelp();
+ System.exit(1);
+ }
+
+ String action = args[0].toLowerCase();
+ if ("compact".equals(action)) {
+ runCompact(Arrays.copyOfRange(args, 1, args.length));
+ } else {
+ System.err.println("Unknown action \"" + action + "\"");
+ printHelp();
+ System.exit(1);
+ }
+ }
+
+ private static void runCompact(String[] args) throws Exception {
+ Optional<CompactAction> action = CompactAction.create(MultipleParameterTool.fromArgs(args));
+ if (action.isPresent()) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ action.get().build(env);
+ env.execute("Compact job: " + String.join(" ", args));
+ } else {
+ System.exit(1);
+ }
+ }
+
+ private static void printHelp() {
+ System.out.println("Usage: <action> [OPTIONS]");
+ System.out.println();
+
+ System.out.println("Available actions:");
+ System.out.println(" compact");
+ System.out.println();
+
+ System.out.println("For detailed options of each action, run <action> --help");
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
index 3e9ee855..bad47854 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
@@ -57,7 +57,8 @@ public abstract class FlinkSink implements Serializable {
}
protected StoreSinkWrite.Provider createWriteProvider(String initialCommitUser) {
- if (table.options().changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+ if (table.options().changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION
+ && !table.options().writeCompactionSkip()) {
long fullCompactionThresholdMs =
table.options().changelogProducerFullCompactionTriggerInterval().toMillis();
return (table, context, ioManager) ->
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
index 58827b4f..e7e9a148 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
@@ -207,6 +207,20 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
return Optional.empty();
}
+ if (earliestId > identifierToCheck) {
+ throw new RuntimeException(
+ String.format(
+ "Can't find snapshot with identifier %d from user %s. "
+ + "Earliest snapshot from all users has identifier %d. "
+ + "This is rare but it might be that snapshots are expiring too fast.\n"
+ + "If this exception happens continuously, please consider increasing %s or %s.",
+ identifierToCheck,
+ commitUser,
+ earliestId,
+ CoreOptions.SNAPSHOT_TIME_RETAINED.key(),
+ CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key()));
+ }
+
// We must find the snapshot whose identifier is exactly `identifierToCheck`.
// We can't just compare with the latest snapshot identifier by this user (like what we do
// in `AbstractFileStoreWrite`), because even if the latest snapshot identifier is newer,
@@ -225,16 +239,7 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
}
}
- throw new RuntimeException(
- String.format(
- "All snapshot from user %s has identifier larger than %d. "
- + "This is rare but it might be that snapshots are expiring too fast.\n"
- + "If this exception happens continuously, please consider increasing "
- + CoreOptions.SNAPSHOT_TIME_RETAINED.key()
- + " or "
- + CoreOptions.SNAPSHOT_NUM_RETAINED_MIN,
- commitUser,
- identifierToCheck));
+ return Optional.empty();
}
@Override
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
index 607d703f..b759040e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
@@ -21,10 +21,13 @@ package org.apache.flink.table.store.connector;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.store.connector.action.FlinkActions;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -115,6 +118,14 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
return sEnv;
}
+ private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig()
+ .setCheckpointInterval(ThreadLocalRandom.current().nextInt(900) + 100);
+ return env;
+ }
+
// ------------------------------------------------------------------------
// Constructed Tests
// ------------------------------------------------------------------------
@@ -225,6 +236,13 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
testFullCompactionChangelogProducerRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
}
+ @Test
+ public void testStandAloneFullCompactJobRandom() throws Exception {
+ TableEnvironment sEnv = createStreamingTableEnvironment();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ testStandAloneFullCompactJobRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
+ }
+
private static final int NUM_PARTS = 4;
private static final int NUM_KEYS = 64;
private static final int NUM_VALUES = 1024;
@@ -253,6 +271,39 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
// if we can first read complete records then read incremental records correctly
Thread.sleep(ThreadLocalRandom.current().nextInt(5000));
+ checkFullCompactionTestResult(numProducers);
+ }
+
+ private void testStandAloneFullCompactJobRandom(
+ TableEnvironment tEnv, int numProducers, boolean enableConflicts) throws Exception {
+ testRandom(
+ tEnv,
+ numProducers,
+ false,
+ "'bucket' = '4',"
+ + "'changelog-producer' = 'full-compaction',"
+ + "'changelog-producer.compaction-interval' = '1s',"
+ + "'write.compaction-skip' = 'true'");
+
+ // sleep for a random amount of time to check
+ // if stand-alone compactor job can find first snapshot to compact correctly
+ Thread.sleep(ThreadLocalRandom.current().nextInt(2500));
+
+ for (int i = enableConflicts ? 2 : 1; i > 0; i--) {
+ StreamExecutionEnvironment env = createStreamExecutionEnvironment();
+ env.setParallelism(2);
+ FlinkActions.compact(new Path(path + "/default.db/T")).build(env);
+ env.executeAsync();
+ }
+
+ // sleep for a random amount of time to check
+ // if we can first read complete records then read incremental records correctly
+ Thread.sleep(ThreadLocalRandom.current().nextInt(2500));
+
+ checkFullCompactionTestResult(numProducers);
+ }
+
+ private void checkFullCompactionTestResult(int numProducers) throws Exception {
TableEnvironment sEnv = createStreamingTableEnvironment();
sEnv.getConfig()
.getConfiguration()
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
new file mode 100644
index 00000000..332d04c7
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/** IT cases for {@link CompactAction}. */
+public class CompactActionITCase extends AbstractTestBase {
+
+ private static final RowType ROW_TYPE =
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.INT().getLogicalType()
+ },
+ new String[] {"dt", "hh", "k", "v"});
+
+ private Path tablePath;
+ private String commitUser;
+
+ @Before
+ public void before() throws IOException {
+ tablePath = new Path(TEMPORARY_FOLDER.newFolder().toString());
+ commitUser = UUID.randomUUID().toString();
+ }
+
+ @Test(timeout = 60000)
+ public void testBatchCompact() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.WRITE_COMPACTION_SKIP.key(), "true");
+
+ FileStoreTable table = createFileStoreTable(options);
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(20221208, 15, 1, 100));
+ write.write(rowData(20221208, 16, 1, 100));
+ write.write(rowData(20221209, 15, 1, 100));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(20221208, 15, 2, 200));
+ write.write(rowData(20221208, 16, 2, 200));
+ write.write(rowData(20221209, 15, 2, 200));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ Assert.assertEquals(2, snapshot.id());
+ Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
+
+ write.close();
+ commit.close();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ new CompactAction(tablePath).withPartitions(getSpecifiedPartitions()).build(env);
+ env.execute();
+
+ snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ Assert.assertEquals(3, snapshot.id());
+ Assert.assertEquals(Snapshot.CommitKind.COMPACT, snapshot.commitKind());
+
+ DataTableScan.DataFilePlan plan = table.newScan().plan();
+ Assert.assertEquals(3, plan.splits().size());
+ for (DataSplit split : plan.splits) {
+ if (split.partition().getInt(1) == 15) {
+ // compacted
+ Assert.assertEquals(1, split.files().size());
+ } else {
+ // not compacted
+ Assert.assertEquals(2, split.files().size());
+ }
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testStreamingCompact() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
+ options.put(CoreOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(), "1s");
+ options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
+ options.put(CoreOptions.WRITE_COMPACTION_SKIP.key(), "true");
+
+ FileStoreTable table = createFileStoreTable(options);
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ // base records
+ write.write(rowData(20221208, 15, 1, 100));
+ write.write(rowData(20221208, 16, 1, 100));
+ write.write(rowData(20221209, 15, 1, 100));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ Assert.assertEquals(1, snapshot.id());
+ Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
+
+ // no full compaction has happened, so plan should be empty
+ SnapshotEnumerator snapshotEnumerator =
+ ContinuousDataFileSnapshotEnumerator.create(table, table.newScan(), null);
+ DataTableScan.DataFilePlan plan = snapshotEnumerator.enumerate();
+ Assert.assertEquals(1, (long) plan.snapshotId);
+ Assert.assertTrue(plan.splits().isEmpty());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig().setCheckpointInterval(500);
+ new CompactAction(tablePath).withPartitions(getSpecifiedPartitions()).build(env);
+ env.executeAsync();
+
+ while (true) {
+ plan = snapshotEnumerator.enumerate();
+ if (plan != null) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ // first full compaction
+ Assert.assertEquals(2, (long) plan.snapshotId);
+ List<String> actual = getResult(table.newRead(), plan.splits());
+ actual.sort(String::compareTo);
+ Assert.assertEquals(Arrays.asList("+I 20221208|15|1|100", "+I 20221209|15|1|100"), actual);
+
+ // incremental records
+ write.write(rowData(20221208, 15, 1, 101));
+ write.write(rowData(20221208, 16, 1, 101));
+ write.write(rowData(20221209, 15, 1, 101));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.close();
+ commit.close();
+
+ while (true) {
+ plan = snapshotEnumerator.enumerate();
+ if (plan != null) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ // second full compaction
+ Assert.assertEquals(4, (long) plan.snapshotId);
+ actual = getResult(table.newRead(), plan.splits());
+ actual.sort(String::compareTo);
+ Assert.assertEquals(
+ Arrays.asList(
+ "+U 20221208|15|1|101",
+ "+U 20221209|15|1|101",
+ "-U 20221208|15|1|100",
+ "-U 20221209|15|1|100"),
+ actual);
+ }
+
+ private List<Map<String, String>> getSpecifiedPartitions() {
+ Map<String, String> partition1 = new HashMap<>();
+ partition1.put("dt", "20221208");
+ partition1.put("hh", "15");
+
+ Map<String, String> partition2 = new HashMap<>();
+ partition2.put("dt", "20221209");
+ partition2.put("hh", "15");
+
+ return Arrays.asList(partition1, partition2);
+ }
+
+ private GenericRowData rowData(Object... values) {
+ return GenericRowData.of(values);
+ }
+
+ private List<String> getResult(TableRead read, List<Split> splits) throws Exception {
+ List<ConcatRecordReader.ReaderSupplier<RowData>> readers = new ArrayList<>();
+ for (Split split : splits) {
+ readers.add(() -> read.createReader(split));
+ }
+ RecordReader<RowData> recordReader = ConcatRecordReader.create(readers);
+ RecordReaderIterator<RowData> iterator = new RecordReaderIterator<>(recordReader);
+ List<String> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ RowData rowData = iterator.next();
+ result.add(rowDataToString(rowData));
+ }
+ iterator.close();
+ return result;
+ }
+
+ private String rowDataToString(RowData rowData) {
+ return String.format(
+ "%s %d|%d|%d|%d",
+ rowData.getRowKind().shortString(),
+ rowData.getInt(0),
+ rowData.getInt(1),
+ rowData.getInt(2),
+ rowData.getInt(3));
+ }
+
+ private FileStoreTable createFileStoreTable(Map<String, String> options) throws Exception {
+ SchemaManager schemaManager = new SchemaManager(tablePath);
+ TableSchema tableSchema =
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Arrays.asList("dt", "hh"),
+ Arrays.asList("dt", "hh", "k"),
+ options,
+ ""));
+ return FileStoreTableFactory.create(tablePath, tableSchema);
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
index 47dadf70..5697ff7c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
@@ -258,7 +258,6 @@ public abstract class FileDataFilterTestBase extends SchemaEvolutionTableTestBas
// schema0, read all data
TableRead read1 =
table.newRead().withFilter(PredicateBuilder.or(predicateList));
- System.out.println(getResult(read1, splits, SCHEMA_1_ROW_TO_STRING));
assertThat(getResult(read1, splits, SCHEMA_1_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
index fe09cdb1..2abae148 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
@@ -77,7 +77,7 @@ public abstract class E2eTestBase {
private final List<String> currentResults = new ArrayList<>();
protected DockerComposeContainer<?> environment;
- private ContainerState jobManager;
+ protected ContainerState jobManager;
@BeforeEach
public void before() throws Exception {
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
new file mode 100644
index 00000000..ada923b2
--- /dev/null
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.tests;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+
+import java.util.UUID;
+
+/** Tests for {@link org.apache.flink.table.store.connector.action.FlinkActions}. */
+public class FlinkActionsE2eTest extends E2eTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkActionsE2eTest.class);
+
+ public FlinkActionsE2eTest() {
+ super(true, false);
+ }
+
+ @Test
+ public void testCompact() throws Exception {
+ String topicName = "ts-topic-" + UUID.randomUUID();
+ createKafkaTopic(topicName, 1);
+ // prepare first part of test data
+ sendKafkaMessage("1.csv", "20221205,1,100\n20221206,1,100\n20221207,1,100", topicName);
+
+ String testDataSourceDdl =
+ String.format(
+ "CREATE TEMPORARY TABLE test_source (\n"
+ + " dt STRING,\n"
+ + " k INT,\n"
+ + " v INT"
+ + ") WITH (\n"
+ + " 'connector' = 'kafka',\n"
+ + " 'properties.bootstrap.servers' = 'kafka:9092',\n"
+ + " 'properties.group.id' = 'testGroup',\n"
+ + " 'scan.startup.mode' = 'earliest-offset',\n"
+ + " 'topic' = '%s',\n"
+ + " 'format' = 'csv'\n"
+ + ");",
+ topicName);
+
+ String warehousePath = TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store";
+ String catalogDdl =
+ String.format(
+ "CREATE CATALOG ts_catalog WITH (\n"
+ + " 'type' = 'table-store',\n"
+ + " 'warehouse' = '%s'\n"
+ + ");",
+ warehousePath);
+
+ String useCatalogCmd = "USE CATALOG ts_catalog;";
+
+ String tableDdl =
+ "CREATE TABLE IF NOT EXISTS ts_table (\n"
+ + " dt STRING,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (dt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (dt) WITH (\n"
+ + " 'changelog-producer' = 'full-compaction',\n"
+ + " 'changelog-producer.compaction-interval' = '1s',\n"
+ + " 'write.compaction-skip' = 'true'\n"
+ + ");";
+
+ // insert data into table store
+ runSql(
+ "SET 'execution.checkpointing.interval' = '1s';\n"
+ + "INSERT INTO ts_table SELECT * FROM test_source;",
+ catalogDdl,
+ useCatalogCmd,
+ tableDdl,
+ testDataSourceDdl);
+
+ // run stand-alone compact job
+ Container.ExecResult execResult =
+ jobManager.execInContainer(
+ "bin/flink",
+ "run",
+ "-c",
+ "org.apache.flink.table.store.connector.action.FlinkActions",
+ "-D",
+ "execution.checkpointing.interval=1s",
+ "--detached",
+ "lib/flink-table-store.jar",
+ "compact",
+ "--warehouse",
+ warehousePath,
+ "--database",
+ "default",
+ "--table",
+ "ts_table",
+ "--partition",
+ "dt=20221205",
+ "--partition",
+ "dt=20221206");
+ LOG.info(execResult.getStdout());
+ LOG.info(execResult.getStderr());
+
+ // read all data from table store
+ runSql(
+ "INSERT INTO result1 SELECT * FROM ts_table;",
+ catalogDdl,
+ useCatalogCmd,
+ tableDdl,
+ createResultSink("result1", "dt STRING, k INT, v INT"));
+
+ // check that first part of test data are compacted
+ checkResult("20221205, 1, 100", "20221206, 1, 100");
+
+ // prepare second part of test data
+ sendKafkaMessage("2.csv", "20221205,1,101\n20221206,1,101\n20221207,1,101", topicName);
+
+ // check that second part of test data are compacted
+ checkResult("20221205, 1, 101", "20221206, 1, 101");
+ }
+
+ private void runSql(String sql, String... ddls) throws Exception {
+ runSql(String.join("\n", ddls) + "\n" + sql);
+ }
+}