You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/13 07:06:02 UTC
[flink-table-store] branch master updated: [FLINK-28465] Wait compaction finished in endInput
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new a407dd9c [FLINK-28465] Wait compaction finished in endInput
a407dd9c is described below
commit a407dd9cfd7ee62cfaed2debd0dc2fc21b69b191
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Wed Jul 13 15:05:58 2022 +0800
[FLINK-28465] Wait compaction finished in endInput
This closes #211
---
.../connector/sink/PrepareCommitOperator.java | 11 +-
.../store/connector/sink/StoreCompactOperator.java | 2 +-
.../store/connector/sink/StoreWriteOperator.java | 4 +-
.../store/connector/AlterTableCompactITCase.java | 3 +-
.../store/connector/AppendOnlyTableITCase.java | 3 +-
.../store/connector/ForceCompactionITCase.java | 214 +++++++++++++++++----
.../source/FileStoreSourceSplitReaderTest.java | 2 +-
.../source/TestChangelogDataReadWrite.java | 2 +-
.../table/store/file/data/AppendOnlyWriter.java | 7 +-
.../store/file/mergetree/MergeTreeWriter.java | 7 +-
.../table/store/file/writer/RecordWriter.java | 5 +-
.../table/store/table/sink/AbstractTableWrite.java | 4 +-
.../flink/table/store/table/sink/TableWrite.java | 2 +-
.../flink/table/store/file/TestFileStore.java | 2 +-
.../store/file/data/AppendOnlyWriterTest.java | 8 +-
.../table/store/file/mergetree/MergeTreeTest.java | 8 +-
.../store/file/operation/TestCommitThread.java | 4 +-
.../store/table/AppendOnlyFileStoreTableTest.java | 6 +-
.../ChangelogValueCountFileStoreTableTest.java | 6 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 12 +-
.../table/store/table/FileStoreTableTestBase.java | 6 +-
.../table/store/table/WritePreemptMemoryTest.java | 2 +-
.../hive/TableStoreHiveStorageHandlerITCase.java | 22 +--
.../store/mapred/TableStoreRecordReaderTest.java | 6 +-
.../table/store/spark/SimpleTableTestHelper.java | 2 +-
25 files changed, 243 insertions(+), 107 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
index 7d686d9e..052bbdb8 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
@@ -44,7 +44,7 @@ public abstract class PrepareCommitOperator extends AbstractStreamOperator<Commi
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
if (!endOfInput) {
- emitCommittables();
+ emitCommittables(false);
}
// no records are expected to emit after endOfInput
}
@@ -52,12 +52,13 @@ public abstract class PrepareCommitOperator extends AbstractStreamOperator<Commi
@Override
public void endInput() throws Exception {
endOfInput = true;
- emitCommittables();
+ emitCommittables(true);
}
- private void emitCommittables() throws IOException {
- prepareCommit().forEach(committable -> output.collect(new StreamRecord<>(committable)));
+ private void emitCommittables(boolean endOfInput) throws IOException {
+ prepareCommit(endOfInput)
+ .forEach(committable -> output.collect(new StreamRecord<>(committable)));
}
- protected abstract List<Committable> prepareCommit() throws IOException;
+ protected abstract List<Committable> prepareCommit(boolean endOfInput) throws IOException;
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index 8ad6bf07..f4329d6b 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -58,7 +58,7 @@ public class StoreCompactOperator extends PrepareCommitOperator {
}
@Override
- protected List<Committable> prepareCommit() throws IOException {
+ protected List<Committable> prepareCommit(boolean endOfInput) throws IOException {
return compact.compact().stream()
.map(c -> new Committable(Committable.Kind.FILE, c))
.collect(Collectors.toList());
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
index 6679ca21..1c14ec0b 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -181,10 +181,10 @@ public class StoreWriteOperator extends PrepareCommitOperator {
}
@Override
- protected List<Committable> prepareCommit() throws IOException {
+ protected List<Committable> prepareCommit(boolean endOfInput) throws IOException {
List<Committable> committables = new ArrayList<>();
try {
- for (FileCommittable committable : write.prepareCommit()) {
+ for (FileCommittable committable : write.prepareCommit(endOfInput)) {
committables.add(new Committable(Committable.Kind.FILE, committable));
}
} catch (Exception e) {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
index 9d2708a0..34b97668 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -92,8 +92,7 @@ public class AlterTableCompactITCase extends FileStoreTableITCase {
@Test
public void testChangeNumOfSortedRunTrigger() {
- // force auto-compaction and increase trigger
- batchSql("ALTER TABLE T0 SET ('commit.force-compact' = 'true')");
+ // increase trigger
batchSql("ALTER TABLE T0 SET ('num-sorted-run.compaction-trigger' = '5')");
// write duplicates
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
index 70f06993..6391a00c 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
@@ -127,7 +127,6 @@ public class AppendOnlyTableITCase extends FileStoreTableITCase {
@Test
public void testAutoCompaction() {
- batchSql("ALTER TABLE append_table SET ('commit.force-compact' = 'true')");
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')");
batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')");
@@ -200,7 +199,7 @@ public class AppendOnlyTableITCase extends FileStoreTableITCase {
@Override
protected List<String> ddl() {
return Collections.singletonList(
- "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('write-mode'='append-only', 'commit.force-compact' = 'true')");
+ "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('write-mode'='append-only')");
}
private void testRejectChanges(RowKind kind) {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
index c2a0cd39..18c9a9f6 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
@@ -18,6 +18,18 @@
package org.apache.flink.table.store.connector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
import org.junit.Test;
import java.util.Arrays;
@@ -25,9 +37,11 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
-/** ITCase for enabling commit.force-compact. */
+/** ITCase for auto-enabling commit.force-compact under batch mode. */
public class ForceCompactionITCase extends FileStoreTableITCase {
+ private final FileFormat avro = FileFormat.fromIdentifier("avro", new Configuration());
+
@Override
protected List<String> ddl() {
return Arrays.asList(
@@ -40,73 +54,193 @@ public class ForceCompactionITCase extends FileStoreTableITCase {
+ " f0 INT\n, "
+ " f1 STRING\n, "
+ " f2 STRING\n"
- + ")");
+ + ")",
+ "CREATE TABLE IF NOT EXISTS T2 (\n"
+ + " f0 INT\n, "
+ + " f1 STRING\n, "
+ + " f2 STRING\n"
+ + ") WITH (\n"
+ + "'write-mode' = 'append-only')");
}
@Test
public void testDynamicPartition() {
batchSql("ALTER TABLE T SET ('num-levels' = '3')");
- batchSql("ALTER TABLE T SET ('commit.force-compact' = 'true')");
- batchSql(
+
+ // Winter: 1, Spring: 1, Summer: 1
+ assertAppend(
"INSERT INTO T VALUES(1, 'Winter', 'Winter is Coming'),"
+ "(2, 'Winter', 'The First Snowflake'), "
+ "(2, 'Spring', 'The First Rose in Spring'), "
- + "(7, 'Summer', 'Summertime Sadness')");
- batchSql("INSERT INTO T VALUES(12, 'Winter', 'Last Christmas')");
- batchSql("INSERT INTO T VALUES(11, 'Winter', 'Winter is Coming')");
- batchSql("INSERT INTO T VALUES(10, 'Autumn', 'Refrain')");
- batchSql(
+ + "(7, 'Summer', 'Summertime Sadness')",
+ "T",
+ 1L);
+
+ // Winter: 2
+ assertAppend("INSERT INTO T VALUES(12, 'Winter', 'Last Christmas')", "T", 2L);
+
+ // Winter: 3, Spring: 2
+ assertAppend(
+ "INSERT INTO T VALUES(11, 'Winter', 'Winter is Coming'), "
+ + "(4, 'Spring', 'April')",
+ "T",
+ 3L);
+
+ // Autumn: 1
+ assertAppend("INSERT INTO T VALUES(10, 'Autumn', 'Refrain')", "T", 4L);
+
+ // Summer: 2, Spring: 3
+ assertAppend(
"INSERT INTO T VALUES(6, 'Summer', 'Watermelon Sugar'), "
- + "(4, 'Spring', 'Spring Water')");
- batchSql(
+ + "(4, 'Spring', 'Spring Water')",
+ "T",
+ 5L);
+
+ // Summer: 3, Autumn: 2
+ assertAppend(
"INSERT INTO T VALUES(66, 'Summer', 'Summer Vibe'),"
- + " (9, 'Autumn', 'Wake Me Up When September Ends')");
- batchSql(
+ + " (9, 'Autumn', 'Wake Me Up When September Ends')",
+ "T",
+ 6L);
+
+ // Summer: 4, Autumn: 3
+ assertAppend(
"INSERT INTO T VALUES(666, 'Summer', 'Summer Vibe'),"
- + " (9, 'Autumn', 'Wake Me Up When September Ends')");
- batchSql(
+ + " (9, 'Autumn', 'Wake Me Up When September Ends')",
+ "T",
+ 7L);
+
+ // Summer: 5, Autumn: 4, trigger compaction for partition 'Summer'
+ assertCompact(
"INSERT INTO T VALUES(6666, 'Summer', 'Summer Vibe'),"
- + " (9, 'Autumn', 'Wake Me Up When September Ends')");
- batchSql(
+ + " (9, 'Autumn', 'Wake Me Up When September Ends')",
+ "T",
+ 9L,
+ "Summer",
+ "Summer");
+
+ // Summer: 6, Autumn: 5, trigger compaction for partition 'Autumn'
+ assertCompact(
"INSERT INTO T VALUES(66666, 'Summer', 'Summer Vibe'),"
- + " (9, 'Autumn', 'Wake Me Up When September Ends')");
- batchSql(
- "INSERT INTO T VALUES(666666, 'Summer', 'Summer Vibe'),"
- + " (9, 'Autumn', 'Wake Me Up When September Ends')");
- batchSql(
- "INSERT INTO T VALUES(6666666, 'Summer', 'Summer Vibe'),"
- + " (9, 'Autumn', 'Wake Me Up When September Ends')");
-
- assertThat(batchSql("SELECT * FROM T")).hasSize(21);
+ + " (9, 'Autumn', 'Wake Me Up When September Ends')",
+ "T",
+ 11L,
+ "Autumn",
+ "Autumn");
+
+ // Winter: 4, Spring: 4
+ assertAppend(
+ "INSERT INTO T VALUES(1, 'Winter', 'Cold Water'), (4, 'Spring', 'SpringBoot')",
+ "T",
+ 12L);
+
+ // Winter: 5, Spring: 5, trigger compaction for both partitions
+ assertCompact(
+ "INSERT INTO T VALUES(1, 'Winter', 'Winter is Coming'),"
+ + " (4, 'Spring', 'The First Rose in Spring')",
+ "T",
+ 14L,
+ "Spring",
+ "Winter");
+
+ assertThat(batchSql("SELECT * FROM T")).hasSize(22);
}
@Test
public void testNoDefaultNumOfLevels() {
- batchSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')");
- batchSql(
+ assertAppend(
"INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming'),"
+ "(2, 'Winter', 'The First Snowflake'), "
+ "(2, 'Spring', 'The First Rose in Spring'), "
- + "(7, 'Summer', 'Summertime Sadness')");
- batchSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')");
- batchSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')");
- batchSql("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')");
- batchSql(
+ + "(7, 'Summer', 'Summertime Sadness')",
+ "T1",
+ 1L);
+
+ assertAppend("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')", "T1", 2L);
+
+ assertAppend("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')", "T1", 3L);
+
+ assertAppend("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')", "T1", 4L);
+
+ assertCompact(
"INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon Sugar'), "
- + "(4, 'Spring', 'Spring Water')");
- batchSql(
+ + "(4, 'Spring', 'Spring Water')",
+ "T1",
+ 6L);
+
+ assertAppend(
"INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'), "
- + "(9, 'Autumn', 'Wake Me Up When September Ends')");
+ + "(9, 'Autumn', 'Wake Me Up When September Ends')",
+ "T1",
+ 7L);
- batchSql(
+ assertAppend(
"INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
- + "(9, 'Autumn', 'Wake Me Up When September Ends')");
+ + "(9, 'Autumn', 'Wake Me Up When September Ends')",
+ "T1",
+ 8L);
+ // change num-sorted-run.compaction-trigger to test restore LSM
batchSql("ALTER TABLE T1 SET ('num-sorted-run.compaction-trigger' = '2')");
- batchSql(
+
+ assertCompact(
"INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
- + "(9, 'Autumn', 'Wake Me Up When September Ends')");
+ + "(9, 'Autumn', 'Wake Me Up When September Ends')",
+ "T1",
+ 10L);
assertThat(batchSql("SELECT * FROM T1")).hasSize(15);
}
+
+ @Test
+ public void testForceCompact() {
+ // explicit set false to verify force compact
+ batchSql("ALTER TABLE T1 SET ('commit.force-compact' = 'false')");
+ batchSql("ALTER TABLE T1 SET ('num-sorted-run.compaction-trigger' = '2')");
+
+ assertAppend("INSERT INTO T1 VALUES (1, 'Winter', 'Winter is Coming')", "T1", 1L);
+ assertAppend("INSERT INTO T1 VALUES (2, 'Spring', 'Spring Water')", "T1", 2L);
+ assertCompact("INSERT INTO T1 VALUES (3, 'Summer', 'Summer Vibe')", "T1", 4L);
+
+ batchSql("ALTER TABLE T2 SET ('commit.force-compact' = 'false')");
+ batchSql("ALTER TABLE T2 SET ('compaction.early-max.file-num' = '2')");
+ assertAppend("INSERT INTO T2 VALUES (1, 'Winter', 'Winter is Coming')", "T2", 1L);
+ assertCompact("INSERT INTO T2 VALUES (2, 'Spring', 'Spring Water')", "T2", 3L);
+ }
+
+ private void assertAppend(String sql, String tableName, long expectSnapshotId) {
+ batchSql(sql);
+ Snapshot snapshot = findLatestSnapshot(tableName, true);
+ assertThat(snapshot.id()).isEqualTo(expectSnapshotId);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ }
+
+ private void assertCompact(
+ String sql, String tableName, long expectSnapshotId, String... expectPartMinMax) {
+ batchSql(sql);
+ Snapshot snapshot = findLatestSnapshot(tableName, true);
+ assertThat(snapshot.id()).isEqualTo(expectSnapshotId);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+ RowType partType =
+ expectPartMinMax.length > 0
+ ? RowType.of(new LogicalType[] {new VarCharType()}, new String[] {"f1"})
+ : RowType.of();
+ FileStorePathFactory pathFactory =
+ new FileStorePathFactory(
+ getTableDirectory(tableName, true),
+ partType,
+ "default",
+ CoreOptions.FILE_FORMAT.defaultValue());
+
+ List<ManifestFileMeta> manifestFileMetas =
+ new ManifestList.Factory(partType, avro, pathFactory)
+ .create()
+ .read(snapshot.deltaManifestList());
+ assertThat(manifestFileMetas.get(0).numDeletedFiles()).isGreaterThanOrEqualTo(1);
+ BinaryTableStats partStats = manifestFileMetas.get(0).partitionStats();
+ if (expectPartMinMax.length > 0) {
+ assertThat(partStats.min().getString(0).toString()).isEqualTo(expectPartMinMax[0]);
+ assertThat(partStats.max().getString(0).toString()).isEqualTo(expectPartMinMax[1]);
+ }
+ }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 214fbca8..8da3af35 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -147,7 +147,7 @@ public class FileStoreSourceSplitReaderTest {
writer.write(
new KeyValue()
.replace(GenericRowData.of(222L), RowKind.DELETE, GenericRowData.of(333L)));
- List<DataFileMeta> files = writer.prepareCommit().newFiles();
+ List<DataFileMeta> files = writer.prepareCommit(true).newFiles();
writer.close();
assignSplit(reader, newSourceSplit("id1", row(1), 0, files, true));
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 94a02856..1d26808c 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -134,7 +134,7 @@ public class TestChangelogDataReadWrite {
RowKind.INSERT,
GenericRowData.of(tuple2.f1)));
}
- List<DataFileMeta> files = writer.prepareCommit().newFiles();
+ List<DataFileMeta> files = writer.prepareCommit(true).newFiles();
writer.close();
return new ArrayList<>(files);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
index 305a7133..f58e9f60 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
@@ -109,7 +109,7 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
}
@Override
- public Increment prepareCommit() throws Exception {
+ public Increment prepareCommit(boolean endOnfInput) throws Exception {
List<DataFileMeta> newFiles = new ArrayList<>();
if (writer != null) {
writer.close();
@@ -130,7 +130,10 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
// add new generated files
toCompact.addAll(newFiles);
submitCompaction();
- finishCompaction(forceCompact);
+
+ boolean blocking = endOnfInput || forceCompact;
+ finishCompaction(blocking);
+
return drainIncrement(newFiles);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 50fea94f..c838a253 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -179,11 +179,10 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
}
@Override
- public Increment prepareCommit() throws Exception {
+ public Increment prepareCommit(boolean endOfInput) throws Exception {
flushMemory();
- if (commitForceCompact) {
- finishCompaction(true);
- }
+ boolean blocking = endOfInput || commitForceCompact;
+ finishCompaction(blocking);
return drainIncrement();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
index 5b0ce6aa..e072a797 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
@@ -26,7 +26,7 @@ import java.util.List;
/**
* The {@code RecordWriter} is responsible for writing data and handling in-progress files used to
* write yet un-staged data. The incremental files ready to commit is returned to the system by the
- * {@link #prepareCommit()}.
+ * {@link #prepareCommit(boolean)}.
*
* @param <T> type of record to write.
*/
@@ -38,9 +38,10 @@ public interface RecordWriter<T> {
/**
* Prepare for a commit.
*
+ * @param endOfInput Signal that there is no committable anymore.
* @return Incremental files in this snapshot cycle
*/
- Increment prepareCommit() throws Exception;
+ Increment prepareCommit(boolean endOfInput) throws Exception;
/**
* Sync the writer. The structure related to file reading and writing is thread unsafe, there
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
index 8ba3bfc7..51b666c7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
@@ -78,7 +78,7 @@ public abstract class AbstractTableWrite<T> implements TableWrite {
}
@Override
- public List<FileCommittable> prepareCommit() throws Exception {
+ public List<FileCommittable> prepareCommit(boolean endOfInput) throws Exception {
List<FileCommittable> result = new ArrayList<>();
Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>>> partIter =
@@ -93,7 +93,7 @@ public abstract class AbstractTableWrite<T> implements TableWrite {
int bucket = entry.getKey();
RecordWriter<T> writer = entry.getValue();
FileCommittable committable =
- new FileCommittable(partition, bucket, writer.prepareCommit());
+ new FileCommittable(partition, bucket, writer.prepareCommit(endOfInput));
result.add(committable);
// clear if no update
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index 5384f892..3a5615d8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -34,7 +34,7 @@ public interface TableWrite {
SinkRecord write(RowData rowData) throws Exception;
- List<FileCommittable> prepareCommit() throws Exception;
+ List<FileCommittable> prepareCommit(boolean endOfInput) throws Exception;
void close() throws Exception;
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 3a6e39ca..e186a88f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -229,7 +229,7 @@ public class TestFileStore extends KeyValueFileStore {
writers.entrySet()) {
for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
entryWithPartition.getValue().entrySet()) {
- Increment increment = entryWithBucket.getValue().prepareCommit();
+ Increment increment = entryWithBucket.getValue().prepareCommit(emptyWriter);
committable.addFileCommittable(
entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index a60380c0..a16411a4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -81,7 +81,7 @@ public class AppendOnlyWriterTest {
for (int i = 0; i < 3; i++) {
writer.sync();
- Increment inc = writer.prepareCommit();
+ Increment inc = writer.prepareCommit(true);
assertThat(inc.newFiles()).isEqualTo(Collections.emptyList());
assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList());
@@ -134,7 +134,7 @@ public class AppendOnlyWriterTest {
}
writer.sync();
- Increment inc = writer.prepareCommit();
+ Increment inc = writer.prepareCommit(true);
if (txn > 0 && txn % 3 == 0) {
assertThat(inc.compactBefore()).hasSize(4);
assertThat(inc.compactAfter()).hasSize(1);
@@ -191,7 +191,7 @@ public class AppendOnlyWriterTest {
}
writer.sync();
- Increment firstInc = writer.prepareCommit();
+ Increment firstInc = writer.prepareCommit(true);
assertThat(firstInc.compactBefore()).isEqualTo(Collections.emptyList());
assertThat(firstInc.compactAfter()).isEqualTo(Collections.emptyList());
@@ -228,7 +228,7 @@ public class AppendOnlyWriterTest {
assertThat(writer.getToCompact()).containsExactlyElementsOf(firstInc.newFiles());
writer.write(row(id, String.format("%03d", id), PART));
writer.sync();
- Increment secInc = writer.prepareCommit();
+ Increment secInc = writer.prepareCommit(true);
// check compact before and after
List<DataFileMeta> compactBefore = secInc.compactBefore();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 699eeb54..8a5793d4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -166,10 +166,10 @@ public class MergeTreeTest {
@Test
public void testRestore() throws Exception {
List<TestRecord> expected = new ArrayList<>(writeBatch());
- List<DataFileMeta> newFiles = writer.prepareCommit().newFiles();
+ List<DataFileMeta> newFiles = writer.prepareCommit(true).newFiles();
writer = createMergeTreeWriter(newFiles);
expected.addAll(writeBatch());
- writer.prepareCommit();
+ writer.prepareCommit(true);
writer.sync();
assertRecords(expected);
}
@@ -206,7 +206,7 @@ public class MergeTreeTest {
}
writeAll(records);
expected.addAll(records);
- Increment increment = writer.prepareCommit();
+ Increment increment = writer.prepareCommit(true);
mergeCompacted(newFileNames, compactedFiles, increment);
}
writer.close();
@@ -237,7 +237,7 @@ public class MergeTreeTest {
writer.sync();
}
- Increment increment = writer.prepareCommit();
+ Increment increment = writer.prepareCommit(true);
newFiles.addAll(increment.newFiles());
mergeCompacted(newFileNames, compactedFiles, increment);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index fe1b7d7e..070c3767 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -104,7 +104,7 @@ public class TestCommitThread extends Thread {
ManifestCommittable committable =
new ManifestCommittable(String.valueOf(new Random().nextLong()));
for (Map.Entry<BinaryRowData, MergeTreeWriter> entry : writers.entrySet()) {
- committable.addFileCommittable(entry.getKey(), 0, entry.getValue().prepareCommit());
+ committable.addFileCommittable(entry.getKey(), 0, entry.getValue().prepareCommit(true));
}
runWithRetry(committable, () -> commit.commit(committable, Collections.emptyMap()));
@@ -114,7 +114,7 @@ public class TestCommitThread extends Thread {
BinaryRowData partition = overwriteData();
ManifestCommittable committable =
new ManifestCommittable(String.valueOf(new Random().nextLong()));
- committable.addFileCommittable(partition, 0, writers.get(partition).prepareCommit());
+ committable.addFileCommittable(partition, 0, writers.get(partition).prepareCommit(true));
runWithRetry(
committable,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 78cd1c8f..f4498949 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -147,17 +147,17 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
write.write(GenericRowData.of(1, 10, 100L));
write.write(GenericRowData.of(2, 20, 200L));
write.write(GenericRowData.of(1, 11, 101L));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
write.write(GenericRowData.of(1, 12, 102L));
write.write(GenericRowData.of(2, 21, 201L));
write.write(GenericRowData.of(2, 22, 202L));
- commit.commit("1", write.prepareCommit());
+ commit.commit("1", write.prepareCommit(true));
write.write(GenericRowData.of(1, 11, 101L));
write.write(GenericRowData.of(2, 21, 201L));
write.write(GenericRowData.of(1, 12, 102L));
- commit.commit("2", write.prepareCommit());
+ commit.commit("2", write.prepareCommit(true));
write.close();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index e39ac3dd..24733a7b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -147,20 +147,20 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
write.write(GenericRowData.of(1, 10, 100L));
write.write(GenericRowData.of(2, 20, 200L));
write.write(GenericRowData.of(1, 11, 101L));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
write.write(GenericRowData.of(2, 21, 201L));
write.write(GenericRowData.of(1, 12, 102L));
write.write(GenericRowData.of(2, 21, 201L));
write.write(GenericRowData.of(2, 21, 201L));
- commit.commit("1", write.prepareCommit());
+ commit.commit("1", write.prepareCommit(true));
write.write(GenericRowData.of(1, 11, 101L));
write.write(GenericRowData.of(2, 22, 202L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
- commit.commit("2", write.prepareCommit());
+ commit.commit("2", write.prepareCommit(true));
write.close();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 0f3c31a1..a7a5a318 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -58,9 +58,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(GenericRowData.of(1, 10, 200L));
write.write(GenericRowData.of(1, 10, 100L));
write.write(GenericRowData.of(1, 11, 101L));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
write.write(GenericRowData.of(1, 11, 55L));
- commit.commit("1", write.prepareCommit());
+ commit.commit("1", write.prepareCommit(true));
write.close();
List<Split> splits = table.newScan().plan().splits;
@@ -169,7 +169,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(GenericRowData.of(1, 10, 101L));
write.write(GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
write.write(GenericRowData.ofKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
write.close();
List<Split> splits = table.newScan().withIncremental(true).plan().splits;
@@ -192,19 +192,19 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(GenericRowData.of(1, 10, 100L));
write.write(GenericRowData.of(2, 20, 200L));
write.write(GenericRowData.of(1, 11, 101L));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
write.write(GenericRowData.of(1, 10, 1000L));
write.write(GenericRowData.of(2, 21, 201L));
write.write(GenericRowData.of(2, 21, 2001L));
- commit.commit("1", write.prepareCommit());
+ commit.commit("1", write.prepareCommit(true));
write.write(GenericRowData.of(1, 11, 1001L));
write.write(GenericRowData.of(2, 21, 20001L));
write.write(GenericRowData.of(2, 22, 202L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 11, 1001L));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 20, 200L));
- commit.commit("2", write.prepareCommit());
+ commit.commit("2", write.prepareCommit(true));
write.close();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 86e28662..2b818185 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -87,7 +87,7 @@ public abstract class FileStoreTableTestBase {
TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1, 10, 100L));
write.write(GenericRowData.of(2, 20, 200L));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
write.close();
write = table.newWrite().withOverwrite(true);
@@ -95,7 +95,7 @@ public abstract class FileStoreTableTestBase {
write.write(GenericRowData.of(2, 21, 201L));
Map<String, String> overwritePartition = new HashMap<>();
overwritePartition.put("pt", "2");
- commit.withOverwritePartition(overwritePartition).commit("1", write.prepareCommit());
+ commit.withOverwritePartition(overwritePartition).commit("1", write.prepareCommit(true));
write.close();
List<Split> splits = table.newScan().plan().splits;
@@ -121,7 +121,7 @@ public abstract class FileStoreTableTestBase {
write.write(GenericRowData.of(1, 5, 6L));
write.write(GenericRowData.of(1, 7, 8L));
write.write(GenericRowData.of(1, 9, 10L));
- table.newCommit("user").commit("0", write.prepareCommit());
+ table.newCommit("user").commit("0", write.prepareCommit(true));
write.close();
List<Split> splits =
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index aa96da9e..d4ee1d13 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -73,7 +73,7 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
write.write(row);
expected.add(BATCH_ROW_TO_STRING.apply(row));
}
- List<FileCommittable> committables = write.prepareCommit();
+ List<FileCommittable> committables = write.prepareCommit(true);
commit.commit("0", committables);
write.close();
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 80f7d5b7..c677eb51 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -109,7 +109,7 @@ public class TableStoreHiveStorageHandlerITCase {
write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi Again")));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L, StringData.fromString("World")));
write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
write.close();
hiveShell.execute(
@@ -152,7 +152,7 @@ public class TableStoreHiveStorageHandlerITCase {
write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L, StringData.fromString("World")));
write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
write.close();
hiveShell.execute(
@@ -199,7 +199,7 @@ public class TableStoreHiveStorageHandlerITCase {
for (GenericRowData rowData : input) {
write.write(rowData);
}
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
write.close();
hiveShell.execute(
@@ -304,17 +304,17 @@ public class TableStoreHiveStorageHandlerITCase {
TableWrite write = table.newWrite();
TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
write.write(GenericRowData.of((Object) null));
- commit.commit("1", write.prepareCommit());
+ commit.commit("1", write.prepareCommit(true));
write.write(GenericRowData.of(2));
write.write(GenericRowData.of(3));
write.write(GenericRowData.of((Object) null));
- commit.commit("2", write.prepareCommit());
+ commit.commit("2", write.prepareCommit(true));
write.write(GenericRowData.of(4));
write.write(GenericRowData.of(5));
write.write(GenericRowData.of(6));
- commit.commit("3", write.prepareCommit());
+ commit.commit("3", write.prepareCommit(true));
write.close();
hiveShell.execute(
@@ -400,15 +400,15 @@ public class TableStoreHiveStorageHandlerITCase {
375, /* 1971-01-11 */
TimestampData.fromLocalDateTime(
LocalDateTime.of(2022, 5, 17, 17, 29, 20))));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
write.write(GenericRowData.of(null, null));
- commit.commit("1", write.prepareCommit());
+ commit.commit("1", write.prepareCommit(true));
write.write(GenericRowData.of(376 /* 1971-01-12 */, null));
write.write(
GenericRowData.of(
null,
TimestampData.fromLocalDateTime(LocalDateTime.of(2022, 6, 18, 8, 30, 0))));
- commit.commit("2", write.prepareCommit());
+ commit.commit("2", write.prepareCommit(true));
write.close();
hiveShell.execute(
@@ -459,7 +459,7 @@ public class TableStoreHiveStorageHandlerITCase {
write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
write.write(GenericRowData.of(3, 30L, StringData.fromString("World")));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
write.close();
hiveShell.execute(
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 4bf2744c..e8106dc3 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -78,7 +78,7 @@ public class TableStoreRecordReaderTest {
write.write(GenericRowData.of(3L, StringData.fromString("World")));
write.write(GenericRowData.of(1L, StringData.fromString("Hi again")));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2L, StringData.fromString("Hello")));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
RowDataContainer container = reader.createValue();
@@ -120,7 +120,7 @@ public class TableStoreRecordReaderTest {
write.write(GenericRowData.of(1, StringData.fromString("Hi")));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, StringData.fromString("Hello")));
write.write(GenericRowData.of(1, StringData.fromString("Hi")));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
RowDataContainer container = reader.createValue();
@@ -160,7 +160,7 @@ public class TableStoreRecordReaderTest {
write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
- commit.commit("0", write.prepareCommit());
+ commit.commit("0", write.prepareCommit(true));
TableStoreRecordReader reader =
read(table, BinaryRowDataUtil.EMPTY_ROW, 0, Arrays.asList("c", "a"));
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index 680b8dea..705a961a 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -65,6 +65,6 @@ public class SimpleTableTestHelper {
}
public void commit() throws Exception {
- commit.commit(UUID.randomUUID().toString(), writer.prepareCommit());
+ commit.commit(UUID.randomUUID().toString(), writer.prepareCommit(true));
}
}