You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/28 03:01:04 UTC
[flink-table-store] branch master updated: [FLINK-28257] Replace ValueKind to RowKind in table store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 515d1f99 [FLINK-28257] Replace ValueKind to RowKind in table store
515d1f99 is described below
commit 515d1f99b27b1ec9569c025ca59d91bdf8545b54
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jun 28 11:00:59 2022 +0800
[FLINK-28257] Replace ValueKind to RowKind in table store
This closes #176
---
.../store/connector/AlterTableCompactITCase.java | 5 ++--
.../table/store/connector/sink/StoreSinkTest.java | 28 +++++++++++-----------
.../store/connector/sink/TestFileStoreTable.java | 6 ++---
.../source/FileStoreSourceSplitGeneratorTest.java | 4 ++--
.../source/FileStoreSourceSplitReaderTest.java | 8 ++-----
.../source/TestChangelogDataReadWrite.java | 4 ++--
.../apache/flink/table/store/file/KeyValue.java | 9 +++----
.../flink/table/store/file/KeyValueSerializer.java | 5 ++--
.../{ValueKind.java => manifest/FileKind.java} | 10 ++++----
.../table/store/file/manifest/ManifestEntry.java | 7 +++---
.../file/manifest/ManifestEntrySerializer.java | 3 +--
.../flink/table/store/file/mergetree/MemTable.java | 4 ++--
.../store/file/mergetree/MergeTreeReader.java | 5 ++--
.../store/file/mergetree/SortBufferMemTable.java | 4 ++--
.../store/file/operation/FileStoreCommitImpl.java | 20 ++++++++--------
.../table/store/file/operation/FileStoreScan.java | 4 ++--
.../table/ChangelogValueCountFileStoreTable.java | 9 +++----
.../table/ChangelogWithKeyFileStoreTable.java | 21 ++++------------
.../source/ValueContentRowDataRecordIterator.java | 6 +----
.../flink/table/store/file/TestFileStore.java | 4 +++-
.../table/store/file/TestKeyValueGenerator.java | 9 +++----
.../store/file/manifest/ManifestFileMetaTest.java | 6 +----
.../file/manifest/ManifestTestDataGenerator.java | 11 ++++-----
.../table/store/file/mergetree/MergeTreeTest.java | 12 +++++-----
.../mergetree/compact/MergeFunctionTestUtils.java | 6 ++---
.../store/file/operation/FileStoreCommitTest.java | 4 ++--
.../file/operation/KeyValueFileStoreReadTest.java | 4 ++--
.../table/store/file/utils/ReusingTestData.java | 12 +++++-----
28 files changed, 105 insertions(+), 125 deletions(-)
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
index a6cf81a8..9d2708a0 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
@@ -237,10 +236,10 @@ public class AlterTableCompactITCase extends FileStoreTableITCase {
List<KeyValue> data = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
KeyValue kv = generator.next();
- if (kv.valueKind() == ValueKind.ADD) {
+ if (kv.valueKind() == RowKind.INSERT) {
data.add(kv);
} else {
- data.add(kv.replace(kv.key(), ValueKind.ADD, kv.value()));
+ data.add(kv.replace(kv.key(), RowKind.INSERT, kv.value()));
}
}
return data;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index d7f86661..d572de26 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -134,7 +134,7 @@ public class StoreSinkTest {
assertThat(fileStore.committedFiles.get(row(0)).get(0))
.isEqualTo(Collections.singletonList("DELETE-key-2-value-0/2/3"));
assertThat(fileStore.committedFiles.get(row(0)).get(1))
- .isEqualTo(Arrays.asList("ADD-key-0-value-0/0/1", "ADD-key-7-value-0/7/5"));
+ .isEqualTo(Arrays.asList("INSERT-key-0-value-0/0/1", "INSERT-key-7-value-0/7/5"));
}
@Test
@@ -149,11 +149,11 @@ public class StoreSinkTest {
GenericRowData.ofKind(RowKind.UPDATE_AFTER, 0, 4, 5),
GenericRowData.ofKind(RowKind.DELETE, 1, 0, 1));
assertThat(fileStore.committedFiles.get(row(1)).get(0))
- .isEqualTo(Collections.singletonList("ADD-key-1/0/1-value--1"));
+ .isEqualTo(Collections.singletonList("INSERT-key-1/0/1-value--1"));
assertThat(fileStore.committedFiles.get(row(0)).get(0))
- .isEqualTo(Collections.singletonList("ADD-key-0/4/5-value-1"));
+ .isEqualTo(Collections.singletonList("INSERT-key-0/4/5-value-1"));
assertThat(fileStore.committedFiles.get(row(0)).get(1))
- .isEqualTo(Arrays.asList("ADD-key-0/0/1-value-1", "ADD-key-0/2/3-value--1"));
+ .isEqualTo(Arrays.asList("INSERT-key-0/0/1-value-1", "INSERT-key-0/2/3-value--1"));
}
@Test
@@ -164,9 +164,9 @@ public class StoreSinkTest {
writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
assertThat(fileStore.committedFiles.get(row(1)).get(0))
- .isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
+ .isEqualTo(Collections.singletonList("INSERT-key-10-value-1/10/11"));
assertThat(fileStore.committedFiles.get(row(0)).get(0))
- .isEqualTo(Arrays.asList("ADD-key-2-value-0/2/3", "ADD-key-8-value-0/8/9"));
+ .isEqualTo(Arrays.asList("INSERT-key-2-value-0/2/3", "INSERT-key-8-value-0/8/9"));
}
@Test
@@ -178,9 +178,9 @@ public class StoreSinkTest {
writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
assertThat(fileStore.committedFiles.get(row(1)).get(1)).isNull();
assertThat(fileStore.committedFiles.get(row(1)).get(0))
- .isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
+ .isEqualTo(Collections.singletonList("INSERT-key-10-value-1/10/11"));
assertThat(fileStore.committedFiles.get(row(0)).get(0))
- .isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
+ .isEqualTo(Collections.singletonList("INSERT-key-8-value-0/8/9"));
}
@Test
@@ -193,11 +193,11 @@ public class StoreSinkTest {
writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
assertThat(fileStore.committedFiles.get(row(1)).get(1))
- .isEqualTo(Collections.singletonList("ADD-key-0-value-1/0/1"));
+ .isEqualTo(Collections.singletonList("INSERT-key-0-value-1/0/1"));
assertThat(fileStore.committedFiles.get(row(1)).get(0))
- .isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
+ .isEqualTo(Collections.singletonList("INSERT-key-10-value-1/10/11"));
assertThat(fileStore.committedFiles.get(row(0)).get(0))
- .isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
+ .isEqualTo(Collections.singletonList("INSERT-key-8-value-0/8/9"));
}
private void writeAndAssert(StoreSink<?, ?> sink) throws Exception {
@@ -208,11 +208,11 @@ public class StoreSinkTest {
GenericRowData.of(0, 7, 5),
GenericRowData.of(1, 0, 1));
assertThat(fileStore.committedFiles.get(row(1)).get(1))
- .isEqualTo(Collections.singletonList("ADD-key-0-value-1/0/1"));
+ .isEqualTo(Collections.singletonList("INSERT-key-0-value-1/0/1"));
assertThat(fileStore.committedFiles.get(row(0)).get(0))
- .isEqualTo(Collections.singletonList("ADD-key-2-value-0/2/3"));
+ .isEqualTo(Collections.singletonList("INSERT-key-2-value-0/2/3"));
assertThat(fileStore.committedFiles.get(row(0)).get(1))
- .isEqualTo(Arrays.asList("ADD-key-0-value-0/0/1", "ADD-key-7-value-0/7/5"));
+ .isEqualTo(Arrays.asList("INSERT-key-0-value-0/0/1", "INSERT-key-7-value-0/7/5"));
}
private void writeAndCommit(StoreSink<?, ?> sink, RowData... rows) throws Exception {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
index fb2727a8..2bd7e41a 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.connector.sink;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
@@ -85,10 +84,11 @@ public class TestFileStoreTable implements FileStoreTable {
if (store.hasPk) {
kv.replace(
record.primaryKey(),
- isInsert ? ValueKind.ADD : ValueKind.DELETE,
+ isInsert ? RowKind.INSERT : RowKind.DELETE,
record.row());
} else {
- kv.replace(record.row(), ValueKind.ADD, GenericRowData.of(isInsert ? 1L : -1L));
+ kv.replace(
+ record.row(), RowKind.INSERT, GenericRowData.of(isInsert ? 1L : -1L));
}
writer.write(kv);
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 76331ef4..1b20a9df 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.store.connector.source;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
@@ -115,7 +115,7 @@ public class FileStoreSourceSplitGeneratorTest {
private ManifestEntry makeEntry(int partition, int bucket, String fileName) {
return new ManifestEntry(
- ValueKind.ADD,
+ FileKind.ADD,
row(partition), // not used
bucket, // not used
0, // not used
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 9e3e17ae..b5d39f2e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.types.RowKind;
@@ -142,15 +141,12 @@ public class FileStoreSourceSplitReaderTest {
new KeyValue()
.replace(
GenericRowData.of(tuple2.f0),
- ValueKind.ADD,
+ RowKind.INSERT,
GenericRowData.of(tuple2.f1)));
}
writer.write(
new KeyValue()
- .replace(
- GenericRowData.of(222L),
- ValueKind.DELETE,
- GenericRowData.of(333L)));
+ .replace(GenericRowData.of(222L), RowKind.DELETE, GenericRowData.of(333L)));
List<DataFileMeta> files = writer.prepareCommit().newFiles();
writer.close();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 1f32f9bb..66c66dd0 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
@@ -47,6 +46,7 @@ import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
@@ -141,7 +141,7 @@ public class TestChangelogDataReadWrite {
new KeyValue()
.replace(
GenericRowData.of(tuple2.f0),
- ValueKind.ADD,
+ RowKind.INSERT,
GenericRowData.of(tuple2.f1)));
}
List<DataFileMeta> files = writer.prepareCommit().newFiles();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index a5baa708..1978220c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.types.RowKind;
import java.util.ArrayList;
import java.util.List;
@@ -38,7 +39,7 @@ public class KeyValue {
private RowData key;
private long sequenceNumber;
- private ValueKind valueKind;
+ private RowKind valueKind;
private RowData value;
public KeyValue setValue(RowData value) {
@@ -46,11 +47,11 @@ public class KeyValue {
return this;
}
- public KeyValue replace(RowData key, ValueKind valueKind, RowData value) {
+ public KeyValue replace(RowData key, RowKind valueKind, RowData value) {
return replace(key, -1, valueKind, value);
}
- public KeyValue replace(RowData key, long sequenceNumber, ValueKind valueKind, RowData value) {
+ public KeyValue replace(RowData key, long sequenceNumber, RowKind valueKind, RowData value) {
this.key = key;
this.sequenceNumber = sequenceNumber;
this.valueKind = valueKind;
@@ -66,7 +67,7 @@ public class KeyValue {
return sequenceNumber;
}
- public ValueKind valueKind() {
+ public RowKind valueKind() {
return valueKind;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueSerializer.java
index 3719e708..fadcd236 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueSerializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.store.file.utils.ObjectSerializer;
import org.apache.flink.table.store.file.utils.OffsetRowData;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
/**
* Serializer for {@link KeyValue}.
@@ -64,7 +65,7 @@ public class KeyValueSerializer extends ObjectSerializer<KeyValue> {
return toRow(record.key(), record.sequenceNumber(), record.valueKind(), record.value());
}
- public RowData toRow(RowData key, long sequenceNumber, ValueKind valueKind, RowData value) {
+ public RowData toRow(RowData key, long sequenceNumber, RowKind valueKind, RowData value) {
reusedMeta.setField(0, sequenceNumber);
reusedMeta.setField(1, valueKind.toByteValue());
return reusedRow.replace(reusedKeyWithMeta.replace(key, reusedMeta), value);
@@ -75,7 +76,7 @@ public class KeyValueSerializer extends ObjectSerializer<KeyValue> {
reusedKey.replace(row);
reusedValue.replace(row);
long sequenceNumber = row.getLong(keyArity);
- ValueKind valueKind = ValueKind.fromByteValue(row.getByte(keyArity + 1));
+ RowKind valueKind = RowKind.fromByteValue(row.getByte(keyArity + 1));
reusedKv.replace(reusedKey, sequenceNumber, valueKind, reusedValue);
return reusedKv;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/ValueKind.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/FileKind.java
similarity index 87%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/ValueKind.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/FileKind.java
index 685d3cb2..c393ad55 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/ValueKind.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/FileKind.java
@@ -16,17 +16,17 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file;
+package org.apache.flink.table.store.file.manifest;
-/** Value kind of a record. */
-public enum ValueKind {
+/** Kind of a file. */
+public enum FileKind {
ADD((byte) 0),
DELETE((byte) 1);
private final byte value;
- ValueKind(byte value) {
+ FileKind(byte value) {
this.value = value;
}
@@ -34,7 +34,7 @@ public enum ValueKind {
return value;
}
- public static ValueKind fromByteValue(byte value) {
+ public static FileKind fromByteValue(byte value) {
switch (value) {
case 0:
return ADD;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
index c857df6e..9fcc0512 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.file.manifest;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -34,7 +33,7 @@ import static org.apache.flink.table.store.file.utils.SerializationUtils.newByte
/** Entry of a manifest file, representing an addition / deletion of a data file. */
public class ManifestEntry {
- private final ValueKind kind;
+ private final FileKind kind;
// for tables without partition this field should be a row with 0 columns (not null)
private final BinaryRowData partition;
private final int bucket;
@@ -42,7 +41,7 @@ public class ManifestEntry {
private final DataFileMeta file;
public ManifestEntry(
- ValueKind kind,
+ FileKind kind,
BinaryRowData partition,
int bucket,
int totalBuckets,
@@ -54,7 +53,7 @@ public class ManifestEntry {
this.file = file;
}
- public ValueKind kind() {
+ public FileKind kind() {
return kind;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
index 2f5874c6..f1e5a423 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.file.manifest;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
@@ -67,7 +66,7 @@ public class ManifestEntrySerializer extends VersionedObjectSerializer<ManifestE
throw new IllegalArgumentException("Unsupported version: " + version);
}
return new ManifestEntry(
- ValueKind.fromByteValue(row.getByte(0)),
+ FileKind.fromByteValue(row.getByte(0)),
deserializeBinaryRow(row.getBinary(1)),
row.getInt(2),
row.getInt(3),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
index 944516cd..bca182ed 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.store.file.mergetree;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.types.RowKind;
import java.io.IOException;
import java.util.Comparator;
@@ -38,7 +38,7 @@ public interface MemTable {
*
* @return True, if the record was successfully written, false, if the mem table was full.
*/
- boolean put(long sequenceNumber, ValueKind valueKind, RowData key, RowData value)
+ boolean put(long sequenceNumber, RowKind valueKind, RowData key, RowData value)
throws IOException;
/** Record size of this table. */
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
index fbf17b83..da39d0a3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.file.mergetree;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
@@ -28,6 +27,7 @@ import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.Re
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.SortMergeReader;
import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.types.RowKind;
import javax.annotation.Nullable;
@@ -84,7 +84,8 @@ public class MergeTreeReader implements RecordReader<KeyValue> {
return null;
}
- if (kv.valueKind() == ValueKind.ADD) {
+ if (kv.valueKind() == RowKind.INSERT
+ || kv.valueKind() == RowKind.UPDATE_AFTER) {
return kv;
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
index 9258d2b6..4dafb52f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
@@ -31,11 +31,11 @@ import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.store.codegen.CodeGenUtils;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.MutableObjectIterator;
import java.io.IOException;
@@ -83,7 +83,7 @@ public class SortBufferMemTable implements MemTable {
}
@Override
- public boolean put(long sequenceNumber, ValueKind valueKind, RowData key, RowData value)
+ public boolean put(long sequenceNumber, RowKind valueKind, RowData key, RowData value)
throws IOException {
return buffer.write(serializer.toRow(key, sequenceNumber, valueKind, value));
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index a2fef4fc..f6a1404e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -23,8 +23,8 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
@@ -166,7 +166,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
LOG.debug("Ready to commit\n" + committable.toString());
}
- List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
+ List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), FileKind.ADD);
if (!appendChanges.isEmpty()) {
tryCommit(
appendChanges,
@@ -177,8 +177,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
List<ManifestEntry> compactChanges = new ArrayList<>();
- compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
- compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
+ compactChanges.addAll(collectChanges(committable.compactBefore(), FileKind.DELETE));
+ compactChanges.addAll(collectChanges(committable.compactAfter(), FileKind.ADD));
if (!compactChanges.isEmpty()) {
tryCommit(
compactChanges,
@@ -202,7 +202,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
+ committable.toString());
}
- List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
+ List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), FileKind.ADD);
// sanity check, all changes must be done within the given partition
Predicate partitionFilter = PredicateConverter.fromMap(partition, partitionType);
if (partitionFilter != null) {
@@ -222,8 +222,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
partitionFilter, appendChanges, committable.identifier(), committable.logOffsets());
List<ManifestEntry> compactChanges = new ArrayList<>();
- compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
- compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
+ compactChanges.addAll(collectChanges(committable.compactBefore(), FileKind.DELETE));
+ compactChanges.addAll(collectChanges(committable.compactAfter(), FileKind.ADD));
if (!compactChanges.isEmpty()) {
tryCommit(
compactChanges,
@@ -267,7 +267,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
for (ManifestEntry entry : currentEntries) {
changesWithOverwrite.add(
new ManifestEntry(
- ValueKind.DELETE,
+ FileKind.DELETE,
entry.partition(),
entry.bucket(),
entry.totalBuckets(),
@@ -289,7 +289,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
private List<ManifestEntry> collectChanges(
- Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> map, ValueKind kind) {
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> map, FileKind kind) {
List<ManifestEntry> changes = new ArrayList<>();
for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entryWithPartition :
map.entrySet()) {
@@ -466,7 +466,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
Set<ManifestEntry.Identifier> removedFiles =
changes.stream()
- .filter(e -> e.kind().equals(ValueKind.DELETE))
+ .filter(e -> e.kind().equals(FileKind.DELETE))
.map(ManifestEntry::identifier)
.collect(Collectors.toSet());
if (removedFiles.isEmpty()) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 3d21746b..307c7503 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.predicate.Predicate;
@@ -70,7 +70,7 @@ public interface FileStoreScan {
List<ManifestEntry> files = files();
Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = new HashMap<>();
for (ManifestEntry entry : files) {
- checkArgument(entry.kind() == ValueKind.ADD);
+ checkArgument(entry.kind() == FileKind.ADD);
groupBy.computeIfAbsent(entry.partition(), k -> new HashMap<>())
.computeIfAbsent(entry.bucket(), k -> new ArrayList<>())
.add(entry.file());
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 842258fd..5319c7b5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueFileStore;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
@@ -47,6 +46,7 @@ import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
/** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode without primary keys. */
public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
@@ -130,18 +130,19 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
new SinkRecordConverter(store.options().bucket(), tableSchema);
return new MemoryTableWrite<KeyValue>(store.newWrite(), recordConverter, store.options()) {
+ private final KeyValue kv = new KeyValue();
+
@Override
protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
throws Exception {
- KeyValue kv = new KeyValue();
switch (record.row().getRowKind()) {
case INSERT:
case UPDATE_AFTER:
- kv.replace(record.row(), ValueKind.ADD, GenericRowData.of(1L));
+ kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(1L));
break;
case UPDATE_BEFORE:
case DELETE:
- kv.replace(record.row(), ValueKind.ADD, GenericRowData.of(-1L));
+ kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(-1L));
break;
default:
throw new UnsupportedOperationException(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 337c3c9d..bb26efa0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueFileStore;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
@@ -178,24 +177,14 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
SinkRecordConverter recordConverter =
new SinkRecordConverter(store.options().bucket(), tableSchema);
return new MemoryTableWrite<KeyValue>(store.newWrite(), recordConverter, store.options()) {
+
+ private final KeyValue kv = new KeyValue();
+
@Override
protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
throws Exception {
- KeyValue kv = new KeyValue();
- switch (record.row().getRowKind()) {
- case INSERT:
- case UPDATE_AFTER:
- kv.replace(record.primaryKey(), ValueKind.ADD, record.row());
- break;
- case UPDATE_BEFORE:
- case DELETE:
- kv.replace(record.primaryKey(), ValueKind.DELETE, record.row());
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown row kind " + record.row().getRowKind());
- }
- writer.write(kv);
+ writer.write(
+ kv.replace(record.primaryKey(), record.row().getRowKind(), record.row()));
}
};
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java
index 716f4838..3f442201 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java
@@ -20,9 +20,7 @@ package org.apache.flink.table.store.table.source;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.types.RowKind;
import java.io.IOException;
@@ -41,9 +39,7 @@ public class ValueContentRowDataRecordIterator extends ResetRowKindRecordIterato
}
RowData rowData = kv.value();
- if (kv.valueKind() == ValueKind.DELETE) {
- rowData.setRowKind(RowKind.DELETE);
- }
+ rowData.setRowKind(kv.valueKind());
return rowData;
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 1c4ff107..bf3fce36 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -309,9 +309,11 @@ public class TestFileStore extends KeyValueFileStore {
BinaryRowData key = keySerializer.toBinaryRow(kv.key()).copy();
BinaryRowData value = valueSerializer.toBinaryRow(kv.value()).copy();
switch (kv.valueKind()) {
- case ADD:
+ case INSERT:
+ case UPDATE_AFTER:
result.put(key, value);
break;
+ case UPDATE_BEFORE:
case DELETE:
result.remove(key);
break;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
index 29dc549d..90a02764 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
import javax.annotation.Nullable;
@@ -169,12 +170,12 @@ public class TestKeyValueGenerator {
public KeyValue next() {
int op = random.nextInt(5);
Order order = null;
- ValueKind kind = ValueKind.ADD;
+ RowKind kind = RowKind.INSERT;
if (op == 0 && addedOrders.size() > 0) {
// delete order
order = pick(addedOrders);
deletedOrders.add(order);
- kind = ValueKind.DELETE;
+ kind = RowKind.DELETE;
} else if (op == 1) {
// update order
if (random.nextBoolean() && deletedOrders.size() > 0) {
@@ -185,14 +186,14 @@ public class TestKeyValueGenerator {
if (order != null) {
order.update();
addedOrders.add(order);
- kind = ValueKind.ADD;
+ kind = RowKind.INSERT;
}
}
if (order == null) {
// new order
order = new Order();
addedOrders.add(order);
- kind = ValueKind.ADD;
+ kind = RowKind.INSERT;
}
return new KeyValue()
.replace(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 32b0051e..cf89de9b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -24,14 +24,12 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -56,8 +54,6 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ManifestFileMetaTest {
private static final RowType PARTITION_TYPE = RowType.of(new IntType());
- private static final RowType KEY_TYPE = RowType.of(new IntType());
- private static final RowType ROW_TYPE = RowType.of(new BigIntType());
private final FileFormat avro;
@@ -236,7 +232,7 @@ public class ManifestFileMetaTest {
writer.complete();
return new ManifestEntry(
- isAdd ? ValueKind.ADD : ValueKind.DELETE,
+ isAdd ? FileKind.ADD : FileKind.DELETE,
binaryRowData, // not used
0, // not used
0, // not used
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
index 11fc3813..2eff6284 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.manifest;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileTestDataGenerator;
import org.apache.flink.table.store.file.stats.FieldStatsCollector;
import org.apache.flink.util.Preconditions;
@@ -72,7 +71,7 @@ public class ManifestTestDataGenerator {
level.add(file);
bufferedResults.push(
new ManifestEntry(
- ValueKind.ADD, file.partition, file.bucket, numBuckets, file.meta));
+ FileKind.ADD, file.partition, file.bucket, numBuckets, file.meta));
mergeLevelsIfNeeded(file.partition, file.bucket);
return bufferedResults.poll();
@@ -89,7 +88,7 @@ public class ManifestTestDataGenerator {
long numDeletedFiles = 0;
for (ManifestEntry entry : entries) {
collector.collect(entry.partition());
- if (entry.kind() == ValueKind.ADD) {
+ if (entry.kind() == FileKind.ADD) {
numAddedFiles++;
} else {
numDeletedFiles++;
@@ -121,7 +120,7 @@ public class ManifestTestDataGenerator {
for (DataFileTestDataGenerator.Data file : currentLevel) {
bufferedResults.push(
new ManifestEntry(
- ValueKind.DELETE, partition, bucket, numBuckets, file.meta));
+ FileKind.DELETE, partition, bucket, numBuckets, file.meta));
kvs.addAll(file.content);
}
currentLevel.clear();
@@ -129,7 +128,7 @@ public class ManifestTestDataGenerator {
for (DataFileTestDataGenerator.Data file : nextLevel) {
bufferedResults.push(
new ManifestEntry(
- ValueKind.DELETE, partition, bucket, numBuckets, file.meta));
+ FileKind.DELETE, partition, bucket, numBuckets, file.meta));
kvs.addAll(file.content);
}
nextLevel.clear();
@@ -140,7 +139,7 @@ public class ManifestTestDataGenerator {
nextLevel.addAll(merged);
for (DataFileTestDataGenerator.Data file : nextLevel) {
bufferedResults.push(
- new ManifestEntry(ValueKind.ADD, partition, bucket, numBuckets, file.meta));
+ new ManifestEntry(FileKind.ADD, partition, bucket, numBuckets, file.meta));
}
lastModifiedLevel += 1;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 72f58376..69ca3a6f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.data.DataFileWriter;
@@ -46,6 +45,7 @@ import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -199,7 +199,7 @@ public class MergeTreeTest {
for (int j = 0; j < perBatch; j++) {
records.add(
new TestRecord(
- random.nextBoolean() ? ValueKind.ADD : ValueKind.DELETE,
+ random.nextBoolean() ? RowKind.INSERT : RowKind.DELETE,
random.nextInt(perBatch / 2) - i * (perBatch / 2),
random.nextInt()));
}
@@ -351,7 +351,7 @@ public class MergeTreeTest {
}
if (dropDelete) {
return map.values().stream()
- .filter(record -> record.kind == ValueKind.ADD)
+ .filter(record -> record.kind == RowKind.INSERT)
.collect(Collectors.toList());
}
return new ArrayList<>(map.values());
@@ -394,7 +394,7 @@ public class MergeTreeTest {
for (int i = 0; i < perBatch; i++) {
records.add(
new TestRecord(
- random.nextBoolean() ? ValueKind.ADD : ValueKind.DELETE,
+ random.nextBoolean() ? RowKind.INSERT : RowKind.DELETE,
random.nextInt(perBatch / 2),
random.nextInt()));
}
@@ -403,11 +403,11 @@ public class MergeTreeTest {
private static class TestRecord {
- private final ValueKind kind;
+ private final RowKind kind;
private final int k;
private final int v;
- private TestRecord(ValueKind kind, int k, int v) {
+ private TestRecord(RowKind kind, int k, int v) {
this.kind = kind;
this.k = k;
this.v = v;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
index 83dc92d3..4ff9b083 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
@@ -18,8 +18,8 @@
package org.apache.flink.table.store.file.mergetree.compact;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
@@ -38,13 +38,13 @@ public class MergeFunctionTestUtils {
for (int i = 0; i < input.size(); i++) {
ReusingTestData data = input.get(i);
Preconditions.checkArgument(
- data.valueKind == ValueKind.ADD,
+ data.valueKind == RowKind.INSERT,
"Only ADD value kind is supported for value count merge function.");
c += data.value;
if (i + 1 >= input.size() || data.key != input.get(i + 1).key) {
if (c != 0) {
expected.add(
- new ReusingTestData(data.key, data.sequenceNumber, ValueKind.ADD, c));
+ new ReusingTestData(data.key, data.sequenceNumber, RowKind.INSERT, c));
}
c = 0;
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 5c07c31e..d75949cf 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -24,13 +24,13 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -315,7 +315,7 @@ public class FileStoreCommitTest {
private List<KeyValue> kvMapToKvList(Map<BinaryRowData, BinaryRowData> map) {
return map.entrySet().stream()
- .map(e -> new KeyValue().replace(e.getKey(), -1, ValueKind.ADD, e.getValue()))
+ .map(e -> new KeyValue().replace(e.getKey(), -1, RowKind.INSERT, e.getValue()))
.collect(Collectors.toList());
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index af4f774c..30866470 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
@@ -36,6 +35,7 @@ import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -75,7 +75,7 @@ public class KeyValueFileStoreReadTest {
.replace(
GenericRowData.of(a, b, c),
i,
- ValueKind.ADD,
+ RowKind.INSERT,
GenericRowData.of(delta)));
}
// remove zero occurrence, it might be merged and discarded by the merge tree
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
index 25581938..44c1555f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.store.file.utils;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
@@ -39,10 +39,10 @@ public class ReusingTestData implements Comparable<ReusingTestData> {
public final int key;
public final long sequenceNumber;
- public final ValueKind valueKind;
+ public final RowKind valueKind;
public final long value;
- public ReusingTestData(int key, long sequenceNumber, ValueKind valueKind, long value) {
+ public ReusingTestData(int key, long sequenceNumber, RowKind valueKind, long value) {
this.key = key;
this.sequenceNumber = sequenceNumber;
this.valueKind = valueKind;
@@ -89,7 +89,7 @@ public class ReusingTestData implements Comparable<ReusingTestData> {
new ReusingTestData(
Integer.parseInt(split[0].trim()),
Long.parseLong(split[1].trim()),
- split[2].trim().equals("+") ? ValueKind.ADD : ValueKind.DELETE,
+ split[2].trim().equals("+") ? RowKind.INSERT : RowKind.DELETE,
Long.parseLong(split[3].trim())));
}
return result;
@@ -104,7 +104,7 @@ public class ReusingTestData implements Comparable<ReusingTestData> {
new ReusingTestData(
random.nextInt(numRecords),
generator.next(),
- random.nextBoolean() || onlyAdd ? ValueKind.ADD : ValueKind.DELETE,
+ random.nextBoolean() || onlyAdd ? RowKind.INSERT : RowKind.DELETE,
random.nextInt(10) - 5));
}
return result;
@@ -122,7 +122,7 @@ public class ReusingTestData implements Comparable<ReusingTestData> {
new ReusingTestData(
key,
generator.next(),
- random.nextBoolean() || onlyAdd ? ValueKind.ADD : ValueKind.DELETE,
+ random.nextBoolean() || onlyAdd ? RowKind.INSERT : RowKind.DELETE,
random.nextInt(10) - 5));
}
return new ArrayList<>(result.values());