You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/11/03 02:58:46 UTC
[flink-table-store] branch master updated: [FLINK-29842] Change commitIdentifier in Table Store snapshot to long value
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 35062743 [FLINK-29842] Change commitIdentifier in Table Store snapshot to long value
35062743 is described below
commit 350627439a70b187f7270278b497841fa3f2c554
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Nov 3 10:58:40 2022 +0800
[FLINK-29842] Change commitIdentifier in Table Store snapshot to long value
This closes #346.
---
.../table/store/connector/sink/StoreCommitter.java | 3 +-
.../apache/flink/table/store/file/Snapshot.java | 27 +++++--
.../store/file/manifest/ManifestCommittable.java | 8 +-
.../manifest/ManifestCommittableSerializer.java | 4 +-
.../store/file/operation/FileStoreCommitImpl.java | 10 +--
.../table/store/table/metadata/SnapshotsTable.java | 5 +-
.../flink/table/store/table/sink/TableCommit.java | 4 +-
.../flink/table/store/file/TestFileStore.java | 10 ++-
.../ManifestCommittableSerializerTest.java | 3 +-
.../store/file/operation/FileStoreCommitTest.java | 4 +-
.../store/file/operation/TestCommitThread.java | 15 ++--
.../store/table/AppendOnlyFileStoreTableTest.java | 8 +-
.../ChangelogValueCountFileStoreTableTest.java | 8 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 91 +++++++++++++---------
.../table/store/table/FileStoreTableTestBase.java | 26 +++----
.../table/store/table/SchemaEvolutionTest.java | 4 +-
.../table/store/table/WritePreemptMemoryTest.java | 2 +-
.../hive/TableStoreHiveStorageHandlerITCase.java | 24 +++---
.../store/mapred/TableStoreRecordReaderTest.java | 6 +-
.../table/store/spark/SimpleTableTestHelper.java | 7 +-
.../table/store/spark/SimpleTableTestHelper.java | 7 +-
21 files changed, 160 insertions(+), 116 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
index 9fd720a5..ae43cb69 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
@@ -42,8 +42,7 @@ public class StoreCommitter implements Committer {
@Override
public ManifestCommittable combine(long checkpointId, List<Committable> committables) {
- ManifestCommittable manifestCommittable =
- new ManifestCommittable(String.valueOf(checkpointId));
+ ManifestCommittable manifestCommittable = new ManifestCommittable(checkpointId);
for (Committable committable : committables) {
switch (committable.kind()) {
case FILE:
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index 72f5273e..177c470a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -38,7 +38,7 @@ import java.util.Map;
/**
* This file is the entrance to all data committed at some specific time point.
*
- * <p>Change list:
+ * <p>Versioned change list:
*
* <ul>
* <li>Version 1: Initial version for table store <= 0.2. There is no "version" field in json
@@ -46,6 +46,15 @@ import java.util.Map;
* <li>Version 2: Introduced in table store 0.3. Add "version" field and "changelogManifestList"
* field.
* </ul>
+ *
+ * <p>Unversioned change list:
+ *
+ * <ul>
+ * <li>Since table store 0.22 and table store 0.3, commitIdentifier is changed from a String to a
+ * long value. For table store < 0.22, only Flink connectors have table store sink and they
+ * use checkpointId as commitIdentifier (which is a long value). Json can automatically
+ * perform type conversion so there is no compatibility issue.
+ * </ul>
*/
public class Snapshot {
@@ -96,9 +105,15 @@ public class Snapshot {
@JsonProperty(FIELD_COMMIT_USER)
private final String commitUser;
- // for deduplication
+ // Mainly for snapshot deduplication.
+ //
+ // If multiple snapshots have the same commitIdentifier, reading from any of these snapshots
+ // must produce the same table.
+ //
+ // If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be
+ // committed before snapshot B, and thus snapshot A must contain older records than snapshot B.
@JsonProperty(FIELD_COMMIT_IDENTIFIER)
- private final String commitIdentifier;
+ private final long commitIdentifier;
@JsonProperty(FIELD_COMMIT_KIND)
private final CommitKind commitKind;
@@ -116,7 +131,7 @@ public class Snapshot {
String deltaManifestList,
@Nullable String changelogManifestList,
String commitUser,
- String commitIdentifier,
+ long commitIdentifier,
CommitKind commitKind,
long timeMillis,
Map<Integer, Long> logOffsets) {
@@ -143,7 +158,7 @@ public class Snapshot {
@JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String changelogManifestList,
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
- @JsonProperty(FIELD_COMMIT_IDENTIFIER) String commitIdentifier,
+ @JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
@JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets) {
@@ -198,7 +213,7 @@ public class Snapshot {
}
@JsonGetter(FIELD_COMMIT_IDENTIFIER)
- public String commitIdentifier() {
+ public long commitIdentifier() {
return commitIdentifier;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
index ff8b5cc7..8cdbee10 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
@@ -29,18 +29,18 @@ import java.util.Objects;
/** Manifest commit message. */
public class ManifestCommittable {
- private final String identifier;
+ private final long identifier;
private final Map<Integer, Long> logOffsets;
private final List<FileCommittable> fileCommittables;
- public ManifestCommittable(String identifier) {
+ public ManifestCommittable(long identifier) {
this.identifier = identifier;
this.logOffsets = new HashMap<>();
this.fileCommittables = new ArrayList<>();
}
public ManifestCommittable(
- String identifier,
+ long identifier,
Map<Integer, Long> logOffsets,
List<FileCommittable> fileCommittables) {
this.identifier = identifier;
@@ -61,7 +61,7 @@ public class ManifestCommittable {
logOffsets.put(bucket, offset);
}
- public String identifier() {
+ public long identifier() {
return identifier;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
index f9615bf3..8049493a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
@@ -51,7 +51,7 @@ public class ManifestCommittableSerializer
public byte[] serialize(ManifestCommittable obj) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
- view.writeUTF(obj.identifier());
+ view.writeLong(obj.identifier());
serializeOffsets(view, obj.logOffsets());
view.writeInt(fileCommittableSerializer.getVersion());
fileCommittableSerializer.serializeList(obj.fileCommittables(), view);
@@ -80,7 +80,7 @@ public class ManifestCommittableSerializer
}
DataInputDeserializer view = new DataInputDeserializer(serialized);
- String identifier = view.readUTF();
+ long identifier = view.readLong();
Map<Integer, Long> offsets = deserializeOffsets(view);
int fileCommittableSerializerVersion = view.readInt();
List<FileCommittable> fileCommittables =
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 0157b168..65fe00b7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -158,7 +158,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
// check if a committable is already committed by its identifier
- Map<String, ManifestCommittable> identifiers = new LinkedHashMap<>();
+ Map<Long, ManifestCommittable> identifiers = new LinkedHashMap<>();
for (ManifestCommittable committable : committableList) {
identifiers.put(committable.identifier(), committable);
}
@@ -373,7 +373,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private void tryCommit(
List<ManifestEntry> tableFiles,
List<ManifestEntry> changelogFiles,
- String hash,
+ long identifier,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
Long safeLatestSnapshotId) {
@@ -382,7 +382,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
if (tryCommitOnce(
tableFiles,
changelogFiles,
- hash,
+ identifier,
logOffsets,
commitKind,
latestSnapshotId,
@@ -395,7 +395,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private void tryOverwrite(
Predicate partitionFilter,
List<ManifestEntry> changes,
- String identifier,
+ long identifier,
Map<Integer, Long> logOffsets) {
while (true) {
Long latestSnapshotId = snapshotManager.latestSnapshotId();
@@ -435,7 +435,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private boolean tryCommitOnce(
List<ManifestEntry> tableFiles,
List<ManifestEntry> changelogFiles,
- String identifier,
+ long identifier,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
Long latestSnapshotId,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
index b0d9b143..6eb0c6c1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
@@ -65,8 +65,7 @@ public class SnapshotsTable implements Table {
new RowType.RowField("schema_id", new BigIntType(false)),
new RowType.RowField(
"commit_user", SerializationUtils.newStringType(false)),
- new RowType.RowField(
- "commit_identifier", SerializationUtils.newStringType(false)),
+ new RowType.RowField("commit_identifier", new BigIntType(false)),
new RowType.RowField(
"commit_kind", SerializationUtils.newStringType(false)),
new RowType.RowField("commit_time", new TimestampType(false, 3))));
@@ -185,7 +184,7 @@ public class SnapshotsTable implements Table {
snapshot.id(),
snapshot.schemaId(),
StringData.fromString(snapshot.commitUser()),
- StringData.fromString(snapshot.commitIdentifier()),
+ snapshot.commitIdentifier(),
StringData.fromString(snapshot.commitKind().toString()),
TimestampData.fromLocalDateTime(
LocalDateTime.ofInstant(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
index b44c0c42..c763cc1e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
@@ -68,7 +68,7 @@ public class TableCommit implements AutoCloseable {
return commit.filterCommitted(committables);
}
- public void commit(String identifier, List<FileCommittable> fileCommittables) {
+ public void commit(long identifier, List<FileCommittable> fileCommittables) {
ManifestCommittable committable = new ManifestCommittable(identifier);
for (FileCommittable fileCommittable : fileCommittables) {
committable.addFileCommittable(fileCommittable);
@@ -93,7 +93,7 @@ public class TableCommit implements AutoCloseable {
// create an empty committable
// identifier is Long.MAX_VALUE, come from batch job
// TODO maybe it can be produced by CommitterOperator
- committable = new ManifestCommittable(String.valueOf(Long.MAX_VALUE));
+ committable = new ManifestCommittable(Long.MAX_VALUE);
}
commit.overwrite(overwritePartition, committable, new HashMap<>());
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 0e471d9f..4ecf9664 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -59,7 +59,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@@ -85,6 +84,8 @@ public class TestFileStore extends KeyValueFileStore {
private final RowDataSerializer valueSerializer;
private final String user;
+ private long commitIdentifier;
+
private TestFileStore(
String root,
CoreOptions options,
@@ -105,6 +106,8 @@ public class TestFileStore extends KeyValueFileStore {
this.keySerializer = new RowDataSerializer(keyType);
this.valueSerializer = new RowDataSerializer(valueType);
this.user = UUID.randomUUID().toString();
+
+ this.commitIdentifier = 0L;
}
public FileStoreCommitImpl newCommit() {
@@ -170,7 +173,7 @@ public class TestFileStore extends KeyValueFileStore {
Function<KeyValue, BinaryRowData> partitionCalculator,
Function<KeyValue, Integer> bucketCalculator,
boolean emptyWriter,
- String identifier,
+ Long identifier,
BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
throws Exception {
AbstractFileStoreWrite<KeyValue> write = newWrite();
@@ -205,8 +208,7 @@ public class TestFileStore extends KeyValueFileStore {
FileStoreCommit commit = newCommit(user);
ManifestCommittable committable =
- new ManifestCommittable(
- identifier == null ? String.valueOf(new Random().nextLong()) : identifier);
+ new ManifestCommittable(identifier == null ? commitIdentifier++ : identifier);
for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> entryWithPartition :
writers.entrySet()) {
for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
index 161cfc16..d40d368c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -56,8 +56,7 @@ public class ManifestCommittableSerializerTest {
}
public static ManifestCommittable create() {
- ManifestCommittable committable =
- new ManifestCommittable(String.valueOf(new Random().nextLong()));
+ ManifestCommittable committable = new ManifestCommittable(new Random().nextLong());
addFileCommittables(committable, row(0), 0);
addFileCommittables(committable, row(0), 1);
addFileCommittables(committable, row(1), 0);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index cb2ab9aa..987ae4ec 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -166,7 +166,7 @@ public class FileStoreCommitTest {
FileUtils.deleteOrWarn(firstSnapshotPath);
// this test succeeds if this call does not fail
store.newCommit(UUID.randomUUID().toString())
- .filterCommitted(Collections.singletonList(new ManifestCommittable("dummy")));
+ .filterCommitted(Collections.singletonList(new ManifestCommittable(999)));
}
protected void testRandomConcurrentNoConflict(
@@ -459,7 +459,7 @@ public class FileStoreCommitTest {
gen::getPartition,
kv -> 0,
false,
- String.valueOf(i),
+ (long) i,
(commit, committable) -> {
commit.commit(committable, Collections.emptyMap());
committables.add(committable);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index eb3317b6..5349fa43 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -39,7 +39,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@@ -67,6 +66,8 @@ public class TestCommitThread extends Thread {
private final AbstractFileStoreWrite<KeyValue> write;
private final FileStoreCommit commit;
+ private long commitIdentifier;
+
public TestCommitThread(
RowType keyType,
RowType valueType,
@@ -93,6 +94,8 @@ public class TestCommitThread extends Thread {
this.write = safeStore.newWrite();
this.commit = testStore.newCommit(UUID.randomUUID().toString()).withCreateEmptyCommit(true);
+
+ this.commitIdentifier = 0;
}
public List<KeyValue> getResult() {
@@ -129,8 +132,7 @@ public class TestCommitThread extends Thread {
for (int i = 0; i < numWrites && !data.isEmpty(); i++) {
writeData();
}
- ManifestCommittable committable =
- new ManifestCommittable(String.valueOf(new Random().nextLong()));
+ ManifestCommittable committable = new ManifestCommittable(commitIdentifier++);
for (Map.Entry<BinaryRowData, MergeTreeWriter> entry : writers.entrySet()) {
RecordWriter.CommitIncrement inc = entry.getValue().prepareCommit(true);
committable.addFileCommittable(
@@ -143,8 +145,7 @@ public class TestCommitThread extends Thread {
private void doOverwrite() throws Exception {
BinaryRowData partition = overwriteData();
- ManifestCommittable committable =
- new ManifestCommittable(String.valueOf(new Random().nextLong()));
+ ManifestCommittable committable = new ManifestCommittable(commitIdentifier++);
RecordWriter.CommitIncrement inc = writers.get(partition).prepareCommit(true);
committable.addFileCommittable(
new FileCommittable(partition, 0, inc.newFilesIncrement(), inc.compactIncrement()));
@@ -159,10 +160,10 @@ public class TestCommitThread extends Thread {
}
private void doFinalCompact() {
+ long identifier = commitIdentifier++;
while (true) {
try {
- ManifestCommittable committable =
- new ManifestCommittable(String.valueOf(new Random().nextLong()));
+ ManifestCommittable committable = new ManifestCommittable(identifier);
for (BinaryRowData partition : writtenPartitions) {
MergeTreeWriter writer =
writers.computeIfAbsent(partition, p -> createWriter(p, false));
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 79296b4a..8bbff172 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -189,7 +189,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
dataset.add(new HashMap<>(dataPerBucket));
dataPerBucket.clear();
}
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
int partition = random.nextInt(numOfPartition);
List<Integer> availableBucket = new ArrayList<>(dataset.get(partition).keySet());
@@ -220,17 +220,17 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowData(1, 10, 100L));
write.write(rowData(2, 20, 200L));
write.write(rowData(1, 11, 101L));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.write(rowData(1, 12, 102L));
write.write(rowData(2, 21, 201L));
write.write(rowData(2, 22, 202L));
- commit.commit("1", write.prepareCommit(true));
+ commit.commit(1, write.prepareCommit(true));
write.write(rowData(1, 11, 101L));
write.write(rowData(2, 21, 201L));
write.write(rowData(1, 12, 102L));
- commit.commit("2", write.prepareCommit(true));
+ commit.commit(2, write.prepareCommit(true));
write.close();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 3b4806b8..630ab6a4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -157,20 +157,20 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
write.write(rowData(1, 10, 100L));
write.write(rowData(2, 20, 200L));
write.write(rowData(1, 11, 101L));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.write(rowData(2, 21, 201L));
write.write(rowData(1, 12, 102L));
write.write(rowData(2, 21, 201L));
write.write(rowData(2, 21, 201L));
- commit.commit("1", write.prepareCommit(true));
+ commit.commit(1, write.prepareCommit(true));
write.write(rowData(1, 11, 101L));
write.write(rowData(2, 22, 202L));
write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
- commit.commit("2", write.prepareCommit(true));
+ commit.commit(2, write.prepareCommit(true));
write.close();
}
@@ -184,7 +184,7 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
// no data file should be produced from this commit
write.write(rowData(1, 10, 100L));
write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.close();
// check that no data file is produced
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index c710cbc6..ef071883 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.utils.CompatibilityTestUtils;
import org.apache.flink.types.RowKind;
+import org.apache.flink.util.function.FunctionWithException;
import org.junit.jupiter.api.Test;
@@ -58,9 +59,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowData(1, 10, 200L));
write.write(rowData(1, 10, 100L));
write.write(rowData(1, 11, 101L));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.write(rowData(1, 11, 55L));
- commit.commit("1", write.prepareCommit(true));
+ commit.commit(1, write.prepareCommit(true));
write.close();
List<Split> splits = table.newScan().plan().splits();
@@ -181,7 +182,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 20, 201L));
write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.close();
List<Split> splits = table.newScan().withIncremental(true).plan().splits();
@@ -216,7 +217,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 210L));
write.compact(binaryRow(1), 0);
write.compact(binaryRow(2), 0);
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
List<Split> splits = table.newScan().withIncremental(true).plan().splits();
TableRead read = table.newRead();
@@ -230,13 +231,13 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowData(1, 40, 140L));
write.write(rowData(2, 30, 230L));
write.write(rowData(2, 40, 240L));
- commit.commit("1", write.prepareCommit(true));
+ commit.commit(1, write.prepareCommit(true));
write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 140L));
write.write(rowData(2, 40, 241L));
write.compact(binaryRow(1), 0);
write.compact(binaryRow(2), 0);
- commit.commit("2", write.prepareCommit(true));
+ commit.commit(2, write.prepareCommit(true));
splits = table.newScan().withIncremental(true).plan().splits();
assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
@@ -248,7 +249,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowData(1, 20, 121L));
write.write(rowData(1, 30, 131L));
write.write(rowData(2, 30, 231L));
- commit.commit("3", write.prepareCommit(true));
+ commit.commit(3, write.prepareCommit(true));
write.write(rowDataWithKind(RowKind.DELETE, 1, 20, 121L));
write.write(rowData(1, 30, 132L));
@@ -259,7 +260,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowData(2, 40, 242L));
write.compact(binaryRow(1), 0);
write.compact(binaryRow(2), 0);
- commit.commit("4", write.prepareCommit(true));
+ commit.commit(4, write.prepareCommit(true));
splits = table.newScan().withIncremental(true).plan().splits();
assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
@@ -285,16 +286,6 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
createFileStoreTable(
conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT));
- // write another commit
- TableWrite write = table.newWrite();
- TableCommit commit = table.newCommit("user");
- write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 102L));
- write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 103L));
- write.write(rowDataWithKind(RowKind.INSERT, 1, 20, 201L));
- write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 301L));
- commit.commit("2", write.prepareCommit(true));
- write.close();
-
List<List<List<String>>> expected =
Arrays.asList(
// first changelog snapshot
@@ -335,20 +326,48 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
ChangelogProducer.INPUT,
Snapshot.FIRST_SNAPSHOT_ID - 1);
- for (int i = 0; i < 3; i++) {
- SnapshotEnumerator.EnumeratorResult result = enumerator.call();
- assertThat(result).isNotNull();
-
- List<Split> splits = result.plan.splits();
- TableRead read = table.newRead();
- for (int j = 0; j < 2; j++) {
- assertThat(getResult(read, splits, binaryRow(j + 1), 0, CHANGELOG_ROW_TO_STRING))
- .isEqualTo(expected.get(i).get(j));
- }
+ FunctionWithException<Integer, Void, Exception> assertNextSnapshot =
+ i -> {
+ SnapshotEnumerator.EnumeratorResult result = enumerator.call();
+ assertThat(result).isNotNull();
+
+ List<Split> splits = result.plan.splits();
+ TableRead read = table.newRead();
+ for (int j = 0; j < 2; j++) {
+ assertThat(
+ getResult(
+ read,
+ splits,
+ binaryRow(j + 1),
+ 0,
+ CHANGELOG_ROW_TO_STRING))
+ .isEqualTo(expected.get(i).get(j));
+ }
+
+ return null;
+ };
+
+ for (int i = 0; i < 2; i++) {
+ assertNextSnapshot.apply(i);
}
// no more changelog
assertThat(enumerator.call()).isNull();
+
+ // write another commit
+ TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
+ write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 102L));
+ write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 103L));
+ write.write(rowDataWithKind(RowKind.INSERT, 1, 20, 201L));
+ write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 301L));
+ commit.commit(2, write.prepareCommit(true));
+ write.close();
+
+ assertNextSnapshot.apply(2);
+
+ // no more changelog
+ assertThat(enumerator.call()).isNull();
}
private void writeData() throws Exception {
@@ -359,19 +378,19 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowData(1, 10, 100L));
write.write(rowData(2, 20, 200L));
write.write(rowData(1, 11, 101L));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.write(rowData(1, 10, 1000L));
write.write(rowData(2, 21, 201L));
write.write(rowData(2, 21, 2001L));
- commit.commit("1", write.prepareCommit(true));
+ commit.commit(1, write.prepareCommit(true));
write.write(rowData(1, 11, 1001L));
write.write(rowData(2, 21, 20001L));
write.write(rowData(2, 22, 202L));
write.write(rowDataWithKind(RowKind.DELETE, 1, 11, 1001L));
write.write(rowDataWithKind(RowKind.DELETE, 2, 20, 200L));
- commit.commit("2", write.prepareCommit(true));
+ commit.commit(2, write.prepareCommit(true));
write.close();
}
@@ -386,15 +405,15 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowData(1, 10, 100L));
write.write(rowData(1, 20, 200L));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.write(rowData(1, 30, 300L));
write.write(rowData(1, 40, 400L));
- commit.commit("1", write.prepareCommit(true));
+ commit.commit(1, write.prepareCommit(true));
write.write(rowData(1, 50, 500L));
write.write(rowData(1, 60, 600L));
- commit.commit("2", write.prepareCommit(true));
+ commit.commit(2, write.prepareCommit(true));
PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
List<Split> splits = table.newScan().plan().splits();
@@ -427,7 +446,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
// update pk 60, 10
write.write(rowData(1, 60, 500L));
write.write(rowData(1, 10, 10L));
- commit.commit("3", write.prepareCommit(true));
+ commit.commit(3, write.prepareCommit(true));
write.close();
@@ -461,7 +480,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 100L));
write.write(rowDataWithKind(RowKind.DELETE, 1, 20, 400L));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.close();
List<Split> splits = table.newScan().plan().splits();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 94ed642c..46cd2645 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -137,7 +137,7 @@ public abstract class FileStoreTableTestBase {
TableCommit commit = table.newCommit("user");
write.write(rowData(1, 10, 100L));
write.write(rowData(2, 20, 200L));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.close();
write = table.newWrite().withOverwrite(true);
@@ -145,7 +145,7 @@ public abstract class FileStoreTableTestBase {
write.write(rowData(2, 21, 201L));
Map<String, String> overwritePartition = new HashMap<>();
overwritePartition.put("pt", "2");
- commit.withOverwritePartition(overwritePartition).commit("1", write.prepareCommit(true));
+ commit.withOverwritePartition(overwritePartition).commit(1, write.prepareCommit(true));
write.close();
List<Split> splits = table.newScan().plan().splits();
@@ -171,7 +171,7 @@ public abstract class FileStoreTableTestBase {
write.write(rowData(1, 5, 6L));
write.write(rowData(1, 7, 8L));
write.write(rowData(1, 9, 10L));
- table.newCommit("user").commit("0", write.prepareCommit(true));
+ table.newCommit("user").commit(0, write.prepareCommit(true));
write.close();
List<Split> splits =
@@ -192,15 +192,15 @@ public abstract class FileStoreTableTestBase {
write.write(rowData(1, 10, 100L));
write.write(rowData(1, 20, 200L));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.write(rowData(1, 30, 300L));
write.write(rowData(1, 40, 400L));
- commit.commit("1", write.prepareCommit(true));
+ commit.commit(1, write.prepareCommit(true));
write.write(rowData(1, 50, 500L));
write.write(rowData(1, 60, 600L));
- commit.commit("2", write.prepareCommit(true));
+ commit.commit(2, write.prepareCommit(true));
write.close();
@@ -223,7 +223,7 @@ public abstract class FileStoreTableTestBase {
for (int j = 0; j < 1000; j++) {
write.write(rowData(1, 10 * i * j, 100L * i * j));
}
- commit.commit(String.valueOf(i), write.prepareCommit(false));
+ commit.commit(i, write.prepareCommit(false));
}
write.write(rowData(1, 40, 400L));
@@ -243,9 +243,9 @@ public abstract class FileStoreTableTestBase {
// if remove writer too fast, will see old files, do another compaction
// then will be conflicts
- commit.commit("4", commit4);
- commit.commit("5", commit5);
- commit.commit("6", commit6);
+ commit.commit(4, commit4);
+ commit.commit(5, commit5);
+ commit.commit(6, commit6);
} else {
// commit4 is a compaction commit
// do compaction commit5
@@ -254,8 +254,8 @@ public abstract class FileStoreTableTestBase {
// wait compaction finish
// commit5 should be a compaction commit
- commit.commit("4", commit4);
- commit.commit("5", commit5);
+ commit.commit(4, commit4);
+ commit.commit(5, commit5);
}
write.close();
@@ -274,7 +274,7 @@ public abstract class FileStoreTableTestBase {
TableCommit commit = table.newCommit("user");
for (int i = 0; i < 10; i++) {
write.write(rowData(1, 1, 100L));
- commit.commit(String.valueOf(i), write.prepareCommit(true));
+ commit.commit(i, write.prepareCommit(true));
}
write.close();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 025ecb93..250cfdfa 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -86,7 +86,7 @@ public class SchemaEvolutionTest {
TableWrite write = table.newWrite();
write.write(GenericRowData.of(1, 1L));
write.write(GenericRowData.of(2, 2L));
- table.newCommit("").commit("0", write.prepareCommit(true));
+ table.newCommit("").commit(0, write.prepareCommit(true));
write.close();
schemaManager.commitChanges(
@@ -96,7 +96,7 @@ public class SchemaEvolutionTest {
write = table.newWrite();
write.write(GenericRowData.of(3, 3L, 3L));
write.write(GenericRowData.of(4, 4L, 4L));
- table.newCommit("").commit("1", write.prepareCommit(true));
+ table.newCommit("").commit(1, write.prepareCommit(true));
write.close();
// read all
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index b6a09c4a..f5c77682 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -73,7 +73,7 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
expected.add(BATCH_ROW_TO_STRING.apply(row));
}
List<FileCommittable> committables = write.prepareCommit(true);
- commit.commit("0", committables);
+ commit.commit(0, committables);
write.close();
// read
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 3a7d785c..c4112b36 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -76,6 +76,8 @@ public class TableStoreHiveStorageHandlerITCase {
private static String engine;
+ private long commitIdentifier;
+
@BeforeClass
public static void beforeClass() {
// TODO Currently FlinkEmbeddedHiveRunner can only be used for one test class,
@@ -98,6 +100,8 @@ public class TableStoreHiveStorageHandlerITCase {
hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
hiveShell.execute("USE test_db");
+
+ commitIdentifier = 0;
}
@After
@@ -626,10 +630,10 @@ public class TableStoreHiveStorageHandlerITCase {
for (RowData rowData : data) {
write.write(rowData);
if (ThreadLocalRandom.current().nextInt(5) == 0) {
- commit.commit(UUID.randomUUID().toString(), write.prepareCommit(false));
+ commit.commit(commitIdentifier++, write.prepareCommit(false));
}
}
- commit.commit(UUID.randomUUID().toString(), write.prepareCommit(true));
+ commit.commit(commitIdentifier++, write.prepareCommit(true));
write.close();
String tableName = "test_table_" + (UUID.randomUUID().toString().substring(0, 4));
@@ -674,7 +678,7 @@ public class TableStoreHiveStorageHandlerITCase {
for (GenericRowData rowData : input) {
write.write(rowData);
}
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.close();
hiveShell.execute(
@@ -779,17 +783,17 @@ public class TableStoreHiveStorageHandlerITCase {
TableWrite write = table.newWrite();
TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.write(GenericRowData.of((Object) null));
- commit.commit("1", write.prepareCommit(true));
+ commit.commit(1, write.prepareCommit(true));
write.write(GenericRowData.of(2));
write.write(GenericRowData.of(3));
write.write(GenericRowData.of((Object) null));
- commit.commit("2", write.prepareCommit(true));
+ commit.commit(2, write.prepareCommit(true));
write.write(GenericRowData.of(4));
write.write(GenericRowData.of(5));
write.write(GenericRowData.of(6));
- commit.commit("3", write.prepareCommit(true));
+ commit.commit(3, write.prepareCommit(true));
write.close();
hiveShell.execute(
@@ -875,16 +879,16 @@ public class TableStoreHiveStorageHandlerITCase {
375, /* 1971-01-11 */
TimestampData.fromLocalDateTime(
LocalDateTime.of(2022, 5, 17, 17, 29, 20, 100_000_000))));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
write.write(GenericRowData.of(null, null));
- commit.commit("1", write.prepareCommit(true));
+ commit.commit(1, write.prepareCommit(true));
write.write(GenericRowData.of(376 /* 1971-01-12 */, null));
write.write(
GenericRowData.of(
null,
TimestampData.fromLocalDateTime(
LocalDateTime.of(2022, 6, 18, 8, 30, 0, 100_000_000))));
- commit.commit("2", write.prepareCommit(true));
+ commit.commit(2, write.prepareCommit(true));
write.close();
hiveShell.execute(
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 08186888..79c57848 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -78,7 +78,7 @@ public class TableStoreRecordReaderTest {
write.write(GenericRowData.of(3L, StringData.fromString("World")));
write.write(GenericRowData.of(1L, StringData.fromString("Hi again")));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2L, StringData.fromString("Hello")));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
RowDataContainer container = reader.createValue();
@@ -120,7 +120,7 @@ public class TableStoreRecordReaderTest {
write.write(GenericRowData.of(1, StringData.fromString("Hi")));
write.write(GenericRowData.ofKind(RowKind.DELETE, 2, StringData.fromString("Hello")));
write.write(GenericRowData.of(1, StringData.fromString("Hi")));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
RowDataContainer container = reader.createValue();
@@ -160,7 +160,7 @@ public class TableStoreRecordReaderTest {
write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
- commit.commit("0", write.prepareCommit(true));
+ commit.commit(0, write.prepareCommit(true));
TableStoreRecordReader reader =
read(table, BinaryRowDataUtil.EMPTY_ROW, 0, Arrays.asList("c", "a"));
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index f31eeaea..1f2e4394 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -34,7 +34,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
/** A simple table test helper to write and commit. */
public class SimpleTableTestHelper {
@@ -42,6 +41,8 @@ public class SimpleTableTestHelper {
private final TableWrite writer;
private final TableCommit commit;
+ private long commitIdentifier;
+
public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
this(path, rowType, Collections.emptyList(), Collections.emptyList());
}
@@ -60,6 +61,8 @@ public class SimpleTableTestHelper {
FileStoreTable table = FileStoreTableFactory.create(conf);
this.writer = table.newWrite();
this.commit = table.newCommit("user");
+
+ this.commitIdentifier = 0;
}
public void write(RowData row) throws Exception {
@@ -67,6 +70,6 @@ public class SimpleTableTestHelper {
}
public void commit() throws Exception {
- commit.commit(UUID.randomUUID().toString(), writer.prepareCommit(true));
+ commit.commit(commitIdentifier++, writer.prepareCommit(true));
}
}
diff --git a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index 705a961a..8e7f10ef 100644
--- a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ b/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
/** A simple table test helper to write and commit. */
public class SimpleTableTestHelper {
@@ -41,6 +40,8 @@ public class SimpleTableTestHelper {
private final TableWrite writer;
private final TableCommit commit;
+ private long commitIdentifier;
+
public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
Map<String, String> options = new HashMap<>();
// orc is shaded, can not find shaded classes in ide
@@ -58,6 +59,8 @@ public class SimpleTableTestHelper {
FileStoreTable table = FileStoreTableFactory.create(conf);
this.writer = table.newWrite();
this.commit = table.newCommit("user");
+
+ this.commitIdentifier = 0;
}
public void write(RowData row) throws Exception {
@@ -65,6 +68,6 @@ public class SimpleTableTestHelper {
}
public void commit() throws Exception {
- commit.commit(UUID.randomUUID().toString(), writer.prepareCommit(true));
+ commit.commit(commitIdentifier++, writer.prepareCommit(true));
}
}