You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/14 14:14:35 UTC
[flink-table-store] branch master updated: [FLINK-26458] Rename Accumulator to MergeFunction
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new bdda9cf [FLINK-26458] Rename Accumulator to MergeFunction
bdda9cf is described below
commit bdda9cfee81627fd8b5c290566b1cb5104ca33ad
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Mon Mar 14 22:14:27 2022 +0800
[FLINK-26458] Rename Accumulator to MergeFunction
This closes #42
---
.../flink/table/store/connector/TableStore.java | 14 +++++-----
.../store/connector/source/TestDataReadWrite.java | 11 ++++++--
.../flink/table/store/file/FileStoreImpl.java | 12 ++++----
.../flink/table/store/file/mergetree/MemTable.java | 6 ++--
.../store/file/mergetree/MergeTreeReader.java | 12 ++++----
.../store/file/mergetree/MergeTreeWriter.java | 10 +++----
.../store/file/mergetree/SortBufferMemTable.java | 23 ++++++++--------
...umulator.java => DeduplicateMergeFunction.java} | 8 +++---
.../{Accumulator.java => MergeFunction.java} | 14 +++++-----
.../file/mergetree/compact/SortMergeReader.java | 20 +++++++-------
...cumulator.java => ValueCountMergeFunction.java} | 8 +++---
.../store/file/operation/FileStoreReadImpl.java | 10 +++----
.../store/file/operation/FileStoreWriteImpl.java | 12 ++++----
.../flink/table/store/file/TestFileStore.java | 10 +++----
.../table/store/file/mergetree/MergeTreeTest.java | 8 +++---
.../file/mergetree/SortBufferMemTableTestBase.java | 32 +++++++++++-----------
...rTestUtils.java => MergeFunctionTestUtils.java} | 6 ++--
.../mergetree/compact/SortMergeReaderTestBase.java | 24 ++++++++--------
.../store/file/operation/FileStoreCommitTest.java | 4 +--
.../store/file/operation/FileStoreExpireTest.java | 4 +--
.../store/file/operation/FileStoreReadTest.java | 17 +++++++-----
.../store/file/operation/FileStoreScanTest.java | 4 +--
22 files changed, 140 insertions(+), 129 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 609cee9..b67bf8f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -41,9 +41,9 @@ import org.apache.flink.table.store.connector.source.LogHybridSourceFactory;
import org.apache.flink.table.store.connector.source.StaticFileStoreSplitEnumerator;
import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.FileStoreImpl;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogSourceProvider;
@@ -120,11 +120,11 @@ public class TableStore {
RowType partitionType = ProjectionUtils.project(type, partitions);
RowType keyType;
RowType valueType;
- Accumulator accumulator;
+ MergeFunction mergeFunction;
if (primaryKeys.length == 0) {
keyType = type;
valueType = RowType.of(new BigIntType(false));
- accumulator = new ValueCountAccumulator();
+ mergeFunction = new ValueCountMergeFunction();
} else {
List<RowType.RowField> fields = ProjectionUtils.project(type, primaryKeys).getFields();
// add _KEY_ prefix to avoid conflict with value
@@ -139,9 +139,9 @@ public class TableStore {
f.getDescription().orElse(null)))
.collect(Collectors.toList()));
valueType = type;
- accumulator = new DeduplicateAccumulator();
+ mergeFunction = new DeduplicateMergeFunction();
}
- return new FileStoreImpl(options, user, partitionType, keyType, valueType, accumulator);
+ return new FileStoreImpl(options, user, partitionType, keyType, valueType, mergeFunction);
}
/** Source builder to build a flink {@link Source}. */
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
index df68c95..80ca7b1 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
@@ -73,7 +73,12 @@ public class TestDataReadWrite {
public FileStoreRead createRead() {
return new FileStoreReadImpl(
- KEY_TYPE, VALUE_TYPE, COMPARATOR, new DeduplicateAccumulator(), avro, pathFactory);
+ KEY_TYPE,
+ VALUE_TYPE,
+ COMPARATOR,
+ new DeduplicateMergeFunction(),
+ avro,
+ pathFactory);
}
public List<SstFileMeta> writeFiles(
@@ -95,7 +100,7 @@ public class TestDataReadWrite {
KEY_TYPE,
VALUE_TYPE,
COMPARATOR,
- new DeduplicateAccumulator(),
+ new DeduplicateMergeFunction(),
avro,
pathFactory,
null, // not used, we only create an empty writer
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index 2d96faf..bc779c9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
@@ -47,7 +47,7 @@ public class FileStoreImpl implements FileStore {
private final RowType partitionType;
private final RowType keyType;
private final RowType valueType;
- private final Accumulator accumulator;
+ private final MergeFunction mergeFunction;
private final GeneratedRecordComparator genRecordComparator;
public FileStoreImpl(
@@ -56,13 +56,13 @@ public class FileStoreImpl implements FileStore {
RowType partitionType,
RowType keyType,
RowType valueType,
- Accumulator accumulator) {
+ MergeFunction mergeFunction) {
this.options = new FileStoreOptions(options);
this.user = user;
this.partitionType = partitionType;
this.keyType = keyType;
this.valueType = valueType;
- this.accumulator = accumulator;
+ this.mergeFunction = mergeFunction;
this.genRecordComparator =
new SortCodeGenerator(
new TableConfig(),
@@ -104,7 +104,7 @@ public class FileStoreImpl implements FileStore {
keyType,
valueType,
newKeyComparator(),
- accumulator,
+ mergeFunction,
options.fileFormat(),
pathFactory(),
newScan(),
@@ -117,7 +117,7 @@ public class FileStoreImpl implements FileStore {
keyType,
valueType,
newKeyComparator(),
- accumulator,
+ mergeFunction,
options.fileFormat(),
pathFactory());
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
index d6ea49b..3dd0054 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.file.mergetree;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import java.io.IOException;
import java.util.Comparator;
@@ -47,9 +47,9 @@ public interface MemTable {
/**
* Returns an iterator over the records in this table. The elements are returned in the order of
* key and sequence number and elements with the same key will be merged by the given {@link
- * Accumulator}.
+ * MergeFunction}.
*/
- Iterator<KeyValue> iterator(Comparator<RowData> keyComparator, Accumulator accumulator);
+ Iterator<KeyValue> iterator(Comparator<RowData> keyComparator, MergeFunction mergeFunction);
/** Removes all records from this table. The table will be empty after this call returns. */
void clear();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
index 01876ca..7e56db4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
@@ -21,9 +21,9 @@ package org.apache.flink.table.store.file.mergetree;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.SortMergeReader;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import org.apache.flink.table.store.file.mergetree.sst.SstFileReader;
@@ -48,14 +48,16 @@ public class MergeTreeReader implements RecordReader {
boolean dropDelete,
SstFileReader sstFileReader,
Comparator<RowData> userKeyComparator,
- Accumulator accumulator)
+ MergeFunction mergeFunction)
throws IOException {
this.dropDelete = dropDelete;
List<ReaderSupplier> readers = new ArrayList<>();
for (List<SortedRun> section : sections) {
readers.add(
- () -> readerForSection(section, sstFileReader, userKeyComparator, accumulator));
+ () ->
+ readerForSection(
+ section, sstFileReader, userKeyComparator, mergeFunction));
}
this.reader = ConcatRecordReader.create(readers);
}
@@ -104,13 +106,13 @@ public class MergeTreeReader implements RecordReader {
List<SortedRun> section,
SstFileReader sstFileReader,
Comparator<RowData> userKeyComparator,
- Accumulator accumulator)
+ MergeFunction mergeFunction)
throws IOException {
List<RecordReader> readers = new ArrayList<>();
for (SortedRun run : section) {
readers.add(readerForRun(run, sstFileReader));
}
- return SortMergeReader.create(readers, userKeyComparator, accumulator);
+ return SortMergeReader.create(readers, userKeyComparator, mergeFunction);
}
public static RecordReader readerForRun(SortedRun run, SstFileReader sstFileReader)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 89771f2..4565ad5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import org.apache.flink.table.store.file.mergetree.sst.SstFileWriter;
import org.apache.flink.table.store.file.utils.RecordWriter;
@@ -51,7 +51,7 @@ public class MergeTreeWriter implements RecordWriter {
private final Comparator<RowData> keyComparator;
- private final Accumulator accumulator;
+ private final MergeFunction mergeFunction;
private final SstFileWriter sstFileWriter;
@@ -71,7 +71,7 @@ public class MergeTreeWriter implements RecordWriter {
Levels levels,
long maxSequenceNumber,
Comparator<RowData> keyComparator,
- Accumulator accumulator,
+ MergeFunction mergeFunction,
SstFileWriter sstFileWriter,
boolean commitForceCompact) {
this.memTable = memTable;
@@ -79,7 +79,7 @@ public class MergeTreeWriter implements RecordWriter {
this.levels = levels;
this.newSequenceNumber = maxSequenceNumber + 1;
this.keyComparator = keyComparator;
- this.accumulator = accumulator;
+ this.mergeFunction = mergeFunction;
this.sstFileWriter = sstFileWriter;
this.commitForceCompact = commitForceCompact;
this.newFiles = new LinkedHashSet<>();
@@ -112,7 +112,7 @@ public class MergeTreeWriter implements RecordWriter {
private void flush() throws Exception {
if (memTable.size() > 0) {
finishCompaction();
- Iterator<KeyValue> iterator = memTable.iterator(keyComparator, accumulator);
+ Iterator<KeyValue> iterator = memTable.iterator(keyComparator, mergeFunction);
List<SstFileMeta> files =
sstFileWriter.write(CloseableIterator.adapterForIterator(iterator), 0);
newFiles.addAll(files);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
index 581964a..441721b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
@@ -32,7 +32,7 @@ import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.utils.HeapMemorySegmentPool;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -106,10 +106,11 @@ public class SortBufferMemTable implements MemTable {
}
@Override
- public Iterator<KeyValue> iterator(Comparator<RowData> keyComparator, Accumulator accumulator) {
+ public Iterator<KeyValue> iterator(
+ Comparator<RowData> keyComparator, MergeFunction mergeFunction) {
new QuickSort().sort(buffer);
MutableObjectIterator<BinaryRowData> kvIter = buffer.getIterator();
- return new MemTableIterator(kvIter, keyComparator, accumulator);
+ return new MemTableIterator(kvIter, keyComparator, mergeFunction);
}
@Override
@@ -120,9 +121,9 @@ public class SortBufferMemTable implements MemTable {
private class MemTableIterator implements Iterator<KeyValue> {
private final MutableObjectIterator<BinaryRowData> kvIter;
private final Comparator<RowData> keyComparator;
- private final Accumulator accumulator;
+ private final MergeFunction mergeFunction;
- // holds the accumulated value
+ // holds the merged value
private KeyValueSerializer previous;
private BinaryRowData previousRow;
// reads the next kv
@@ -133,10 +134,10 @@ public class SortBufferMemTable implements MemTable {
private MemTableIterator(
MutableObjectIterator<BinaryRowData> kvIter,
Comparator<RowData> keyComparator,
- Accumulator accumulator) {
+ MergeFunction mergeFunction) {
this.kvIter = kvIter;
this.keyComparator = keyComparator;
- this.accumulator = accumulator;
+ this.mergeFunction = mergeFunction;
int totalFieldCount = keyType.getFieldCount() + 2 + valueType.getFieldCount();
this.previous = new KeyValueSerializer(keyType, valueType);
@@ -175,8 +176,8 @@ public class SortBufferMemTable implements MemTable {
if (previousRow == null) {
return;
}
- accumulator.reset();
- accumulator.add(previous.getReusedKv().value());
+ mergeFunction.reset();
+ mergeFunction.add(previous.getReusedKv().value());
while (readOnce()) {
if (keyComparator.compare(
@@ -184,10 +185,10 @@ public class SortBufferMemTable implements MemTable {
!= 0) {
break;
}
- accumulator.add(current.getReusedKv().value());
+ mergeFunction.add(current.getReusedKv().value());
swapSerializers();
}
- result = accumulator.getValue();
+ result = mergeFunction.getValue();
} while (result == null);
previous.getReusedKv().setValue(result);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateAccumulator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
similarity index 83%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateAccumulator.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
index be16a09..b9befdf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateAccumulator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
@@ -23,10 +23,10 @@ import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
/**
- * A {@link Accumulator} where key is primary key (unique) and value is the full record, only keep
+ * A {@link MergeFunction} where key is primary key (unique) and value is the full record, only keep
* the latest one.
*/
-public class DeduplicateAccumulator implements Accumulator {
+public class DeduplicateMergeFunction implements MergeFunction {
private RowData latestValue;
@@ -47,7 +47,7 @@ public class DeduplicateAccumulator implements Accumulator {
}
@Override
- public Accumulator copy() {
- return new DeduplicateAccumulator();
+ public MergeFunction copy() {
+ return new DeduplicateMergeFunction();
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/Accumulator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java
similarity index 71%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/Accumulator.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java
index ae5c2fc..cc1c713 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/Accumulator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java
@@ -25,19 +25,19 @@ import javax.annotation.Nullable;
import java.io.Serializable;
-/** Accumulators to merge multiple {@link KeyValue}s. */
-public interface Accumulator extends Serializable {
+/** Merge function to merge multiple {@link KeyValue}s. */
+public interface MergeFunction extends Serializable {
- /** Reset the accumulator to its default state. */
+ /** Reset the merge function to its default state. */
void reset();
- /** Add the given {@link RowData} to the accumulator. */
+ /** Add the given {@link RowData} to the merge function. */
void add(RowData value);
- /** Get current accumulated value. Return null if this accumulated result should be skipped. */
+ /** Get current merged value. Return null if this merged result should be skipped. */
@Nullable
RowData getValue();
- /** Create a new accumulator object with the same functionality as this one. */
- Accumulator copy();
+ /** Create a new merge function object with the same functionality as this one. */
+ MergeFunction copy();
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
index 0f3c184..ee2ed04 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
@@ -42,7 +42,7 @@ public class SortMergeReader implements RecordReader {
private final List<RecordReader> nextBatchReaders;
private final Comparator<RowData> userKeyComparator;
- private final Accumulator accumulator;
+ private final MergeFunction mergeFunction;
private final PriorityQueue<Element> minHeap;
private final List<Element> polled;
@@ -50,10 +50,10 @@ public class SortMergeReader implements RecordReader {
protected SortMergeReader(
List<RecordReader> readers,
Comparator<RowData> userKeyComparator,
- Accumulator accumulator) {
+ MergeFunction mergeFunction) {
this.nextBatchReaders = new ArrayList<>(readers);
this.userKeyComparator = userKeyComparator;
- this.accumulator = accumulator;
+ this.mergeFunction = mergeFunction;
this.minHeap =
new PriorityQueue<>(
@@ -70,10 +70,10 @@ public class SortMergeReader implements RecordReader {
public static RecordReader create(
List<RecordReader> readers,
Comparator<RowData> userKeyComparator,
- Accumulator accumulator) {
+ MergeFunction mergeFunction) {
return readers.size() == 1
? readers.get(0)
- : new SortMergeReader(readers, userKeyComparator, accumulator);
+ : new SortMergeReader(readers, userKeyComparator, mergeFunction);
}
@Nullable
@@ -130,9 +130,9 @@ public class SortMergeReader implements RecordReader {
if (!hasMore) {
return null;
}
- RowData accumulatedValue = accumulator.getValue();
- if (accumulatedValue != null) {
- return polled.get(polled.size() - 1).kv.setValue(accumulatedValue);
+ RowData mergedValue = mergeFunction.getValue();
+ if (mergedValue != null) {
+ return polled.get(polled.size() - 1).kv.setValue(mergedValue);
}
}
}
@@ -163,7 +163,7 @@ public class SortMergeReader implements RecordReader {
return false;
}
- accumulator.reset();
+ mergeFunction.reset();
RowData key =
Preconditions.checkNotNull(minHeap.peek(), "Min heap is empty. This is a bug.")
.kv
@@ -177,7 +177,7 @@ public class SortMergeReader implements RecordReader {
break;
}
minHeap.poll();
- accumulator.add(element.kv.value());
+ mergeFunction.add(element.kv.value());
polled.add(element);
}
return true;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountAccumulator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
similarity index 86%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountAccumulator.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
index 18d3931..b62157a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountAccumulator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
@@ -26,10 +26,10 @@ import javax.annotation.Nullable;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
- * A {@link Accumulator} where key is the full record and value is a count which represents number
+ * A {@link MergeFunction} where key is the full record and value is a count which represents number
* of records of the exact same fields.
*/
-public class ValueCountAccumulator implements Accumulator {
+public class ValueCountMergeFunction implements MergeFunction {
private long total;
@@ -50,8 +50,8 @@ public class ValueCountAccumulator implements Accumulator {
}
@Override
- public Accumulator copy() {
- return new ValueCountAccumulator();
+ public MergeFunction copy() {
+ return new ValueCountMergeFunction();
}
private long count(RowData value) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
index d652c4c..fa412ed 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
@@ -22,9 +22,9 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import org.apache.flink.table.store.file.mergetree.sst.SstFileReader;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -41,7 +41,7 @@ public class FileStoreReadImpl implements FileStoreRead {
private final SstFileReader.Factory sstFileReaderFactory;
private final Comparator<RowData> keyComparator;
- private final Accumulator accumulator;
+ private final MergeFunction mergeFunction;
private boolean keyProjected;
private boolean dropDelete = true;
@@ -50,13 +50,13 @@ public class FileStoreReadImpl implements FileStoreRead {
RowType keyType,
RowType valueType,
Comparator<RowData> keyComparator,
- Accumulator accumulator,
+ MergeFunction mergeFunction,
FileFormat fileFormat,
FileStorePathFactory pathFactory) {
this.sstFileReaderFactory =
new SstFileReader.Factory(keyType, valueType, fileFormat, pathFactory);
this.keyComparator = keyComparator;
- this.accumulator = accumulator;
+ this.mergeFunction = mergeFunction;
this.keyProjected = false;
}
@@ -105,7 +105,7 @@ public class FileStoreReadImpl implements FileStoreRead {
dropDelete,
sstFileReader,
keyComparator,
- accumulator.copy());
+ mergeFunction.copy());
}
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index 441068a..2a47af1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -27,9 +27,9 @@ import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
import org.apache.flink.table.store.file.mergetree.SortBufferMemTable;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import org.apache.flink.table.store.file.mergetree.sst.SstFileReader;
@@ -51,7 +51,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
private final SstFileReader.Factory sstFileReaderFactory;
private final SstFileWriter.Factory sstFileWriterFactory;
private final Comparator<RowData> keyComparator;
- private final Accumulator accumulator;
+ private final MergeFunction mergeFunction;
private final FileStorePathFactory pathFactory;
private final FileStoreScan scan;
private final MergeTreeOptions options;
@@ -60,7 +60,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
RowType keyType,
RowType valueType,
Comparator<RowData> keyComparator,
- Accumulator accumulator,
+ MergeFunction mergeFunction,
FileFormat fileFormat,
FileStorePathFactory pathFactory,
FileStoreScan scan,
@@ -71,7 +71,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
new SstFileWriter.Factory(
keyType, valueType, fileFormat, pathFactory, options.targetFileSize);
this.keyComparator = keyComparator;
- this.accumulator = accumulator;
+ this.mergeFunction = mergeFunction;
this.pathFactory = pathFactory;
this.scan = scan;
this.options = options;
@@ -123,7 +123,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
new Levels(keyComparator, restoreFiles, options.numLevels),
maxSequenceNumber,
keyComparator,
- accumulator.copy(),
+ mergeFunction.copy(),
sstFileWriter,
options.commitForceCompact);
}
@@ -147,7 +147,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
dropDelete,
sstFileReaderFactory.create(partition, bucket),
keyComparator,
- accumulator.copy())),
+ mergeFunction.copy())),
outputLevel);
return new CompactManager(
compactExecutor, compactStrategy, keyComparator, options.targetFileSize, rewriter);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 70ca005..4fb291c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -29,7 +29,7 @@ import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import org.apache.flink.table.store.file.operation.FileStoreCommit;
import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
@@ -82,7 +82,7 @@ public class TestFileStore extends FileStoreImpl {
RowType partitionType,
RowType keyType,
RowType valueType,
- Accumulator accumulator) {
+ MergeFunction mergeFunction) {
Configuration conf = new Configuration();
conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, MemorySize.parse("16 kb"));
@@ -98,7 +98,7 @@ public class TestFileStore extends FileStoreImpl {
conf.set(FileStoreOptions.TABLE_PATH, root);
conf.set(FileStoreOptions.BUCKET, numBuckets);
- return new TestFileStore(conf, partitionType, keyType, valueType, accumulator);
+ return new TestFileStore(conf, partitionType, keyType, valueType, mergeFunction);
}
public TestFileStore(
@@ -106,8 +106,8 @@ public class TestFileStore extends FileStoreImpl {
RowType partitionType,
RowType keyType,
RowType valueType,
- Accumulator accumulator) {
- super(conf, UUID.randomUUID().toString(), partitionType, keyType, valueType, accumulator);
+ MergeFunction mergeFunction) {
+ super(conf, UUID.randomUUID().toString(), partitionType, keyType, valueType, mergeFunction);
this.root = conf.getString(FileStoreOptions.TABLE_PATH);
this.keySerializer = new RowDataSerializer(keyType);
this.valueSerializer = new RowDataSerializer(valueType);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 18b84cc..e03b6f6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.format.FlushingFileFormat;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
@@ -263,7 +263,7 @@ public class MergeTreeTest {
new Levels(comparator, files, options.numLevels),
maxSequenceNumber,
comparator,
- new DeduplicateAccumulator(),
+ new DeduplicateMergeFunction(),
sstFileWriter,
options.commitForceCompact);
}
@@ -284,7 +284,7 @@ public class MergeTreeTest {
dropDelete,
sstFileReader,
comparator,
- new DeduplicateAccumulator())),
+ new DeduplicateMergeFunction())),
outputLevel);
return new CompactManager(
compactExecutor, compactStrategy, comparator, options.targetFileSize, rewriter);
@@ -357,7 +357,7 @@ public class MergeTreeTest {
dropDelete,
sstFileReader,
comparator,
- new DeduplicateAccumulator());
+ new DeduplicateMergeFunction());
List<TestRecord> records = new ArrayList<>();
try (RecordReaderIterator iterator = new RecordReaderIterator(reader)) {
while (iterator.hasNext()) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
index 407e592..c92fff2 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
@@ -20,10 +20,10 @@ package org.apache.flink.table.store.file.mergetree;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
-import org.apache.flink.table.store.file.mergetree.compact.AccumulatorTestUtils;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionTestUtils;
+import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.utils.ReusingKeyValue;
import org.apache.flink.table.store.file.utils.ReusingTestData;
import org.apache.flink.table.types.logical.BigIntType;
@@ -61,7 +61,7 @@ public abstract class SortBufferMemTableTestBase {
protected abstract List<ReusingTestData> getExpected(List<ReusingTestData> input);
- protected abstract Accumulator createAccumulator();
+ protected abstract MergeFunction createMergeFunction();
@Test
public void testAndClear() throws IOException {
@@ -95,7 +95,7 @@ public abstract class SortBufferMemTableTestBase {
protected void runTest(List<ReusingTestData> input) throws IOException {
List<ReusingTestData> expected = getExpected(input);
prepareTable(input);
- Iterator<KeyValue> actual = table.iterator(KEY_COMPARATOR, createAccumulator());
+ Iterator<KeyValue> actual = table.iterator(KEY_COMPARATOR, createMergeFunction());
Random rnd = new Random();
for (ReusingTestData data : expected) {
@@ -132,8 +132,8 @@ public abstract class SortBufferMemTableTestBase {
assertThat(table.size()).isEqualTo(input.size());
}
- /** Test for {@link SortBufferMemTable} with {@link DeduplicateAccumulator}. */
- public static class WithDeduplicateAccumulatorTest extends SortBufferMemTableTestBase {
+ /** Test for {@link SortBufferMemTable} with {@link DeduplicateMergeFunction}. */
+ public static class WithDeduplicateMergeFunctionTest extends SortBufferMemTableTestBase {
@Override
protected boolean addOnly() {
@@ -142,17 +142,17 @@ public abstract class SortBufferMemTableTestBase {
@Override
protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
- return AccumulatorTestUtils.getExpectedForDeduplicate(input);
+ return MergeFunctionTestUtils.getExpectedForDeduplicate(input);
}
@Override
- protected Accumulator createAccumulator() {
- return new DeduplicateAccumulator();
+ protected MergeFunction createMergeFunction() {
+ return new DeduplicateMergeFunction();
}
}
- /** Test for {@link SortBufferMemTable} with {@link ValueCountAccumulator}. */
- public static class WithValueCountAccumulatorTest extends SortBufferMemTableTestBase {
+ /** Test for {@link SortBufferMemTable} with {@link ValueCountMergeFunction}. */
+ public static class WithValueCountMergeFunctionTest extends SortBufferMemTableTestBase {
@Override
protected boolean addOnly() {
@@ -161,12 +161,12 @@ public abstract class SortBufferMemTableTestBase {
@Override
protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
- return AccumulatorTestUtils.getExpectedForValueCount(input);
+ return MergeFunctionTestUtils.getExpectedForValueCount(input);
}
@Override
- protected Accumulator createAccumulator() {
- return new ValueCountAccumulator();
+ protected MergeFunction createMergeFunction() {
+ return new ValueCountMergeFunction();
}
@Test
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/AccumulatorTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
similarity index 95%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/AccumulatorTestUtils.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
index f322ff6..83dc92d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/AccumulatorTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
@@ -26,8 +26,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-/** Test utils for {@link Accumulator}s. */
-public class AccumulatorTestUtils {
+/** Test utils for {@link MergeFunction}s. */
+public class MergeFunctionTestUtils {
public static List<ReusingTestData> getExpectedForValueCount(List<ReusingTestData> input) {
input = new ArrayList<>(input);
@@ -39,7 +39,7 @@ public class AccumulatorTestUtils {
ReusingTestData data = input.get(i);
Preconditions.checkArgument(
data.valueKind == ValueKind.ADD,
- "Only ADD value kind is supported for value count accumulator.");
+ "Only ADD value kind is supported for value count merge function.");
c += data.value;
if (i + 1 >= input.size() || data.key != input.get(i + 1).key) {
if (c != 0) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java
index d77fba0..1894ff3 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java
@@ -31,11 +31,11 @@ import java.util.List;
/** Tests for {@link SortMergeReader}. */
public abstract class SortMergeReaderTestBase extends CombiningRecordReaderTestBase {
- protected abstract Accumulator createAccumulator();
+ protected abstract MergeFunction createMergeFunction();
@Override
protected RecordReader createRecordReader(List<TestReusingRecordReader> readers) {
- return new SortMergeReader(new ArrayList<>(readers), KEY_COMPARATOR, createAccumulator());
+ return new SortMergeReader(new ArrayList<>(readers), KEY_COMPARATOR, createMergeFunction());
}
@Test
@@ -70,8 +70,8 @@ public abstract class SortMergeReaderTestBase extends CombiningRecordReaderTestB
+ "11, 507, +, 1100 | 12, 508, +, 1200 | 13, 509, +, 1300"));
}
- /** Tests for {@link SortMergeReader} with {@link DeduplicateAccumulator}. */
- public static class WithDeduplicateAccumulator extends SortMergeReaderTestBase {
+ /** Tests for {@link SortMergeReader} with {@link DeduplicateMergeFunction}. */
+ public static class WithDeduplicateMergeFunction extends SortMergeReaderTestBase {
@Override
protected boolean addOnly() {
@@ -80,17 +80,17 @@ public abstract class SortMergeReaderTestBase extends CombiningRecordReaderTestB
@Override
protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
- return AccumulatorTestUtils.getExpectedForDeduplicate(input);
+ return MergeFunctionTestUtils.getExpectedForDeduplicate(input);
}
@Override
- protected Accumulator createAccumulator() {
- return new DeduplicateAccumulator();
+ protected MergeFunction createMergeFunction() {
+ return new DeduplicateMergeFunction();
}
}
- /** Tests for {@link SortMergeReader} with {@link ValueCountAccumulator}. */
- public static class WithValueRecordAccumulatorTest extends SortMergeReaderTestBase {
+ /** Tests for {@link SortMergeReader} with {@link ValueCountMergeFunction}. */
+ public static class WithValueRecordMergeFunctionTest extends SortMergeReaderTestBase {
@Override
protected boolean addOnly() {
@@ -99,12 +99,12 @@ public abstract class SortMergeReaderTestBase extends CombiningRecordReaderTestB
@Override
protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
- return AccumulatorTestUtils.getExpectedForValueCount(input);
+ return MergeFunctionTestUtils.getExpectedForValueCount(input);
}
@Override
- protected Accumulator createAccumulator() {
- return new ValueCountAccumulator();
+ protected MergeFunction createMergeFunction() {
+ return new ValueCountMergeFunction();
}
@Test
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 6a4c04b..f3ca1b0 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
@@ -247,7 +247,7 @@ public class FileStoreCommitTest {
TestKeyValueGenerator.PARTITION_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE,
- new DeduplicateAccumulator());
+ new DeduplicateMergeFunction());
}
private List<KeyValue> generateDataList(int numRecords) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index bb12a2f..3761b4b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.junit.jupiter.api.AfterEach;
@@ -62,7 +62,7 @@ public class FileStoreExpireTest {
TestKeyValueGenerator.PARTITION_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE,
- new DeduplicateAccumulator());
+ new DeduplicateMergeFunction());
pathFactory = store.pathFactory();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
index 43033b5..0163292 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
@@ -27,9 +27,9 @@ import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.types.logical.BigIntType;
@@ -103,7 +103,7 @@ public class FileStoreReadTest {
RowDataSerializer valueSerializer = new RowDataSerializer(valueType);
TestFileStore store =
- createStore(partitionType, keyType, valueType, new ValueCountAccumulator());
+ createStore(partitionType, keyType, valueType, new ValueCountMergeFunction());
List<KeyValue> readData =
writeThenRead(
data,
@@ -141,7 +141,7 @@ public class FileStoreReadTest {
TestKeyValueGenerator.PARTITION_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE,
- new DeduplicateAccumulator());
+ new DeduplicateMergeFunction());
RowDataSerializer projectedValueSerializer =
new RowDataSerializer(
@@ -224,8 +224,11 @@ public class FileStoreReadTest {
}
private TestFileStore createStore(
- RowType partitionType, RowType keyType, RowType valueType, Accumulator accumulator) {
+ RowType partitionType,
+ RowType keyType,
+ RowType valueType,
+ MergeFunction mergeFunction) {
return TestFileStore.create(
- "avro", tempDir.toString(), 1, partitionType, keyType, valueType, accumulator);
+ "avro", tempDir.toString(), 1, partitionType, keyType, valueType, mergeFunction);
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index 4a71b75..a1cbdf5 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.predicate.Equal;
import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -71,7 +71,7 @@ public class FileStoreScanTest {
TestKeyValueGenerator.PARTITION_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE,
- new DeduplicateAccumulator());
+ new DeduplicateMergeFunction());
pathFactory = store.pathFactory();
}