You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/14 14:14:35 UTC

[flink-table-store] branch master updated: [FLINK-26458] Rename Accumulator to MergeFunction

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new bdda9cf  [FLINK-26458] Rename Accumulator to MergeFunction
bdda9cf is described below

commit bdda9cfee81627fd8b5c290566b1cb5104ca33ad
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Mon Mar 14 22:14:27 2022 +0800

    [FLINK-26458] Rename Accumulator to MergeFunction
    
    This closes #42
---
 .../flink/table/store/connector/TableStore.java    | 14 +++++-----
 .../store/connector/source/TestDataReadWrite.java  | 11 ++++++--
 .../flink/table/store/file/FileStoreImpl.java      | 12 ++++----
 .../flink/table/store/file/mergetree/MemTable.java |  6 ++--
 .../store/file/mergetree/MergeTreeReader.java      | 12 ++++----
 .../store/file/mergetree/MergeTreeWriter.java      | 10 +++----
 .../store/file/mergetree/SortBufferMemTable.java   | 23 ++++++++--------
 ...umulator.java => DeduplicateMergeFunction.java} |  8 +++---
 .../{Accumulator.java => MergeFunction.java}       | 14 +++++-----
 .../file/mergetree/compact/SortMergeReader.java    | 20 +++++++-------
 ...cumulator.java => ValueCountMergeFunction.java} |  8 +++---
 .../store/file/operation/FileStoreReadImpl.java    | 10 +++----
 .../store/file/operation/FileStoreWriteImpl.java   | 12 ++++----
 .../flink/table/store/file/TestFileStore.java      | 10 +++----
 .../table/store/file/mergetree/MergeTreeTest.java  |  8 +++---
 .../file/mergetree/SortBufferMemTableTestBase.java | 32 +++++++++++-----------
 ...rTestUtils.java => MergeFunctionTestUtils.java} |  6 ++--
 .../mergetree/compact/SortMergeReaderTestBase.java | 24 ++++++++--------
 .../store/file/operation/FileStoreCommitTest.java  |  4 +--
 .../store/file/operation/FileStoreExpireTest.java  |  4 +--
 .../store/file/operation/FileStoreReadTest.java    | 17 +++++++-----
 .../store/file/operation/FileStoreScanTest.java    |  4 +--
 22 files changed, 140 insertions(+), 129 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 609cee9..b67bf8f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -41,9 +41,9 @@ import org.apache.flink.table.store.connector.source.LogHybridSourceFactory;
 import org.apache.flink.table.store.connector.source.StaticFileStoreSplitEnumerator;
 import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.FileStoreImpl;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.store.log.LogSourceProvider;
@@ -120,11 +120,11 @@ public class TableStore {
         RowType partitionType = ProjectionUtils.project(type, partitions);
         RowType keyType;
         RowType valueType;
-        Accumulator accumulator;
+        MergeFunction mergeFunction;
         if (primaryKeys.length == 0) {
             keyType = type;
             valueType = RowType.of(new BigIntType(false));
-            accumulator = new ValueCountAccumulator();
+            mergeFunction = new ValueCountMergeFunction();
         } else {
             List<RowType.RowField> fields = ProjectionUtils.project(type, primaryKeys).getFields();
             // add _KEY_ prefix to avoid conflict with value
@@ -139,9 +139,9 @@ public class TableStore {
                                                             f.getDescription().orElse(null)))
                                     .collect(Collectors.toList()));
             valueType = type;
-            accumulator = new DeduplicateAccumulator();
+            mergeFunction = new DeduplicateMergeFunction();
         }
-        return new FileStoreImpl(options, user, partitionType, keyType, valueType, accumulator);
+        return new FileStoreImpl(options, user, partitionType, keyType, valueType, mergeFunction);
     }
 
     /** Source builder to build a flink {@link Source}. */
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
index df68c95..80ca7b1 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
@@ -73,7 +73,12 @@ public class TestDataReadWrite {
 
     public FileStoreRead createRead() {
         return new FileStoreReadImpl(
-                KEY_TYPE, VALUE_TYPE, COMPARATOR, new DeduplicateAccumulator(), avro, pathFactory);
+                KEY_TYPE,
+                VALUE_TYPE,
+                COMPARATOR,
+                new DeduplicateMergeFunction(),
+                avro,
+                pathFactory);
     }
 
     public List<SstFileMeta> writeFiles(
@@ -95,7 +100,7 @@ public class TestDataReadWrite {
                         KEY_TYPE,
                         VALUE_TYPE,
                         COMPARATOR,
-                        new DeduplicateAccumulator(),
+                        new DeduplicateMergeFunction(),
                         avro,
                         pathFactory,
                         null, // not used, we only create an empty writer
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index 2d96faf..bc779c9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import org.apache.flink.table.runtime.generated.RecordComparator;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
 import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
 import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
@@ -47,7 +47,7 @@ public class FileStoreImpl implements FileStore {
     private final RowType partitionType;
     private final RowType keyType;
     private final RowType valueType;
-    private final Accumulator accumulator;
+    private final MergeFunction mergeFunction;
     private final GeneratedRecordComparator genRecordComparator;
 
     public FileStoreImpl(
@@ -56,13 +56,13 @@ public class FileStoreImpl implements FileStore {
             RowType partitionType,
             RowType keyType,
             RowType valueType,
-            Accumulator accumulator) {
+            MergeFunction mergeFunction) {
         this.options = new FileStoreOptions(options);
         this.user = user;
         this.partitionType = partitionType;
         this.keyType = keyType;
         this.valueType = valueType;
-        this.accumulator = accumulator;
+        this.mergeFunction = mergeFunction;
         this.genRecordComparator =
                 new SortCodeGenerator(
                                 new TableConfig(),
@@ -104,7 +104,7 @@ public class FileStoreImpl implements FileStore {
                 keyType,
                 valueType,
                 newKeyComparator(),
-                accumulator,
+                mergeFunction,
                 options.fileFormat(),
                 pathFactory(),
                 newScan(),
@@ -117,7 +117,7 @@ public class FileStoreImpl implements FileStore {
                 keyType,
                 valueType,
                 newKeyComparator(),
-                accumulator,
+                mergeFunction,
                 options.fileFormat(),
                 pathFactory());
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
index d6ea49b..3dd0054 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.file.mergetree;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 
 import java.io.IOException;
 import java.util.Comparator;
@@ -47,9 +47,9 @@ public interface MemTable {
     /**
      * Returns an iterator over the records in this table. The elements are returned in the order of
      * key and sequence number and elements with the same key will be merged by the given {@link
-     * Accumulator}.
+     * MergeFunction}.
      */
-    Iterator<KeyValue> iterator(Comparator<RowData> keyComparator, Accumulator accumulator);
+    Iterator<KeyValue> iterator(Comparator<RowData> keyComparator, MergeFunction mergeFunction);
 
     /** Removes all records from this table. The table will be empty after this call returns. */
     void clear();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
index 01876ca..7e56db4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
@@ -21,9 +21,9 @@ package org.apache.flink.table.store.file.mergetree;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.SortMergeReader;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileReader;
@@ -48,14 +48,16 @@ public class MergeTreeReader implements RecordReader {
             boolean dropDelete,
             SstFileReader sstFileReader,
             Comparator<RowData> userKeyComparator,
-            Accumulator accumulator)
+            MergeFunction mergeFunction)
             throws IOException {
         this.dropDelete = dropDelete;
 
         List<ReaderSupplier> readers = new ArrayList<>();
         for (List<SortedRun> section : sections) {
             readers.add(
-                    () -> readerForSection(section, sstFileReader, userKeyComparator, accumulator));
+                    () ->
+                            readerForSection(
+                                    section, sstFileReader, userKeyComparator, mergeFunction));
         }
         this.reader = ConcatRecordReader.create(readers);
     }
@@ -104,13 +106,13 @@ public class MergeTreeReader implements RecordReader {
             List<SortedRun> section,
             SstFileReader sstFileReader,
             Comparator<RowData> userKeyComparator,
-            Accumulator accumulator)
+            MergeFunction mergeFunction)
             throws IOException {
         List<RecordReader> readers = new ArrayList<>();
         for (SortedRun run : section) {
             readers.add(readerForRun(run, sstFileReader));
         }
-        return SortMergeReader.create(readers, userKeyComparator, accumulator);
+        return SortMergeReader.create(readers, userKeyComparator, mergeFunction);
     }
 
     public static RecordReader readerForRun(SortedRun run, SstFileReader sstFileReader)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 89771f2..4565ad5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileWriter;
 import org.apache.flink.table.store.file.utils.RecordWriter;
@@ -51,7 +51,7 @@ public class MergeTreeWriter implements RecordWriter {
 
     private final Comparator<RowData> keyComparator;
 
-    private final Accumulator accumulator;
+    private final MergeFunction mergeFunction;
 
     private final SstFileWriter sstFileWriter;
 
@@ -71,7 +71,7 @@ public class MergeTreeWriter implements RecordWriter {
             Levels levels,
             long maxSequenceNumber,
             Comparator<RowData> keyComparator,
-            Accumulator accumulator,
+            MergeFunction mergeFunction,
             SstFileWriter sstFileWriter,
             boolean commitForceCompact) {
         this.memTable = memTable;
@@ -79,7 +79,7 @@ public class MergeTreeWriter implements RecordWriter {
         this.levels = levels;
         this.newSequenceNumber = maxSequenceNumber + 1;
         this.keyComparator = keyComparator;
-        this.accumulator = accumulator;
+        this.mergeFunction = mergeFunction;
         this.sstFileWriter = sstFileWriter;
         this.commitForceCompact = commitForceCompact;
         this.newFiles = new LinkedHashSet<>();
@@ -112,7 +112,7 @@ public class MergeTreeWriter implements RecordWriter {
     private void flush() throws Exception {
         if (memTable.size() > 0) {
             finishCompaction();
-            Iterator<KeyValue> iterator = memTable.iterator(keyComparator, accumulator);
+            Iterator<KeyValue> iterator = memTable.iterator(keyComparator, mergeFunction);
             List<SstFileMeta> files =
                     sstFileWriter.write(CloseableIterator.adapterForIterator(iterator), 0);
             newFiles.addAll(files);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
index 581964a..441721b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
@@ -32,7 +32,7 @@ import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
 import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.utils.HeapMemorySegmentPool;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -106,10 +106,11 @@ public class SortBufferMemTable implements MemTable {
     }
 
     @Override
-    public Iterator<KeyValue> iterator(Comparator<RowData> keyComparator, Accumulator accumulator) {
+    public Iterator<KeyValue> iterator(
+            Comparator<RowData> keyComparator, MergeFunction mergeFunction) {
         new QuickSort().sort(buffer);
         MutableObjectIterator<BinaryRowData> kvIter = buffer.getIterator();
-        return new MemTableIterator(kvIter, keyComparator, accumulator);
+        return new MemTableIterator(kvIter, keyComparator, mergeFunction);
     }
 
     @Override
@@ -120,9 +121,9 @@ public class SortBufferMemTable implements MemTable {
     private class MemTableIterator implements Iterator<KeyValue> {
         private final MutableObjectIterator<BinaryRowData> kvIter;
         private final Comparator<RowData> keyComparator;
-        private final Accumulator accumulator;
+        private final MergeFunction mergeFunction;
 
-        // holds the accumulated value
+        // holds the merged value
         private KeyValueSerializer previous;
         private BinaryRowData previousRow;
         // reads the next kv
@@ -133,10 +134,10 @@ public class SortBufferMemTable implements MemTable {
         private MemTableIterator(
                 MutableObjectIterator<BinaryRowData> kvIter,
                 Comparator<RowData> keyComparator,
-                Accumulator accumulator) {
+                MergeFunction mergeFunction) {
             this.kvIter = kvIter;
             this.keyComparator = keyComparator;
-            this.accumulator = accumulator;
+            this.mergeFunction = mergeFunction;
 
             int totalFieldCount = keyType.getFieldCount() + 2 + valueType.getFieldCount();
             this.previous = new KeyValueSerializer(keyType, valueType);
@@ -175,8 +176,8 @@ public class SortBufferMemTable implements MemTable {
                 if (previousRow == null) {
                     return;
                 }
-                accumulator.reset();
-                accumulator.add(previous.getReusedKv().value());
+                mergeFunction.reset();
+                mergeFunction.add(previous.getReusedKv().value());
 
                 while (readOnce()) {
                     if (keyComparator.compare(
@@ -184,10 +185,10 @@ public class SortBufferMemTable implements MemTable {
                             != 0) {
                         break;
                     }
-                    accumulator.add(current.getReusedKv().value());
+                    mergeFunction.add(current.getReusedKv().value());
                     swapSerializers();
                 }
-                result = accumulator.getValue();
+                result = mergeFunction.getValue();
             } while (result == null);
             previous.getReusedKv().setValue(result);
         }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateAccumulator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
similarity index 83%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateAccumulator.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
index be16a09..b9befdf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateAccumulator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
@@ -23,10 +23,10 @@ import org.apache.flink.table.data.RowData;
 import javax.annotation.Nullable;
 
 /**
- * A {@link Accumulator} where key is primary key (unique) and value is the full record, only keep
+ * A {@link MergeFunction} where key is primary key (unique) and value is the full record, only keep
  * the latest one.
  */
-public class DeduplicateAccumulator implements Accumulator {
+public class DeduplicateMergeFunction implements MergeFunction {
 
     private RowData latestValue;
 
@@ -47,7 +47,7 @@ public class DeduplicateAccumulator implements Accumulator {
     }
 
     @Override
-    public Accumulator copy() {
-        return new DeduplicateAccumulator();
+    public MergeFunction copy() {
+        return new DeduplicateMergeFunction();
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/Accumulator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java
similarity index 71%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/Accumulator.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java
index ae5c2fc..cc1c713 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/Accumulator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java
@@ -25,19 +25,19 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 
-/** Accumulators to merge multiple {@link KeyValue}s. */
-public interface Accumulator extends Serializable {
+/** Merge function to merge multiple {@link KeyValue}s. */
+public interface MergeFunction extends Serializable {
 
-    /** Reset the accumulator to its default state. */
+    /** Reset the merge function to its default state. */
     void reset();
 
-    /** Add the given {@link RowData} to the accumulator. */
+    /** Add the given {@link RowData} to the merge function. */
     void add(RowData value);
 
-    /** Get current accumulated value. Return null if this accumulated result should be skipped. */
+    /** Get current merged value. Return null if this merged result should be skipped. */
     @Nullable
     RowData getValue();
 
-    /** Create a new accumulator object with the same functionality as this one. */
-    Accumulator copy();
+    /** Create a new merge function object with the same functionality as this one. */
+    MergeFunction copy();
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
index 0f3c184..ee2ed04 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
@@ -42,7 +42,7 @@ public class SortMergeReader implements RecordReader {
 
     private final List<RecordReader> nextBatchReaders;
     private final Comparator<RowData> userKeyComparator;
-    private final Accumulator accumulator;
+    private final MergeFunction mergeFunction;
 
     private final PriorityQueue<Element> minHeap;
     private final List<Element> polled;
@@ -50,10 +50,10 @@ public class SortMergeReader implements RecordReader {
     protected SortMergeReader(
             List<RecordReader> readers,
             Comparator<RowData> userKeyComparator,
-            Accumulator accumulator) {
+            MergeFunction mergeFunction) {
         this.nextBatchReaders = new ArrayList<>(readers);
         this.userKeyComparator = userKeyComparator;
-        this.accumulator = accumulator;
+        this.mergeFunction = mergeFunction;
 
         this.minHeap =
                 new PriorityQueue<>(
@@ -70,10 +70,10 @@ public class SortMergeReader implements RecordReader {
     public static RecordReader create(
             List<RecordReader> readers,
             Comparator<RowData> userKeyComparator,
-            Accumulator accumulator) {
+            MergeFunction mergeFunction) {
         return readers.size() == 1
                 ? readers.get(0)
-                : new SortMergeReader(readers, userKeyComparator, accumulator);
+                : new SortMergeReader(readers, userKeyComparator, mergeFunction);
     }
 
     @Nullable
@@ -130,9 +130,9 @@ public class SortMergeReader implements RecordReader {
                 if (!hasMore) {
                     return null;
                 }
-                RowData accumulatedValue = accumulator.getValue();
-                if (accumulatedValue != null) {
-                    return polled.get(polled.size() - 1).kv.setValue(accumulatedValue);
+                RowData mergedValue = mergeFunction.getValue();
+                if (mergedValue != null) {
+                    return polled.get(polled.size() - 1).kv.setValue(mergedValue);
                 }
             }
         }
@@ -163,7 +163,7 @@ public class SortMergeReader implements RecordReader {
                 return false;
             }
 
-            accumulator.reset();
+            mergeFunction.reset();
             RowData key =
                     Preconditions.checkNotNull(minHeap.peek(), "Min heap is empty. This is a bug.")
                             .kv
@@ -177,7 +177,7 @@ public class SortMergeReader implements RecordReader {
                     break;
                 }
                 minHeap.poll();
-                accumulator.add(element.kv.value());
+                mergeFunction.add(element.kv.value());
                 polled.add(element);
             }
             return true;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountAccumulator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
similarity index 86%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountAccumulator.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
index 18d3931..b62157a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountAccumulator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
@@ -26,10 +26,10 @@ import javax.annotation.Nullable;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A {@link Accumulator} where key is the full record and value is a count which represents number
+ * A {@link MergeFunction} where key is the full record and value is a count which represents number
  * of records of the exact same fields.
  */
-public class ValueCountAccumulator implements Accumulator {
+public class ValueCountMergeFunction implements MergeFunction {
 
     private long total;
 
@@ -50,8 +50,8 @@ public class ValueCountAccumulator implements Accumulator {
     }
 
     @Override
-    public Accumulator copy() {
-        return new ValueCountAccumulator();
+    public MergeFunction copy() {
+        return new ValueCountMergeFunction();
     }
 
     private long count(RowData value) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
index d652c4c..fa412ed 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
@@ -22,9 +22,9 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileReader;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -41,7 +41,7 @@ public class FileStoreReadImpl implements FileStoreRead {
 
     private final SstFileReader.Factory sstFileReaderFactory;
     private final Comparator<RowData> keyComparator;
-    private final Accumulator accumulator;
+    private final MergeFunction mergeFunction;
 
     private boolean keyProjected;
     private boolean dropDelete = true;
@@ -50,13 +50,13 @@ public class FileStoreReadImpl implements FileStoreRead {
             RowType keyType,
             RowType valueType,
             Comparator<RowData> keyComparator,
-            Accumulator accumulator,
+            MergeFunction mergeFunction,
             FileFormat fileFormat,
             FileStorePathFactory pathFactory) {
         this.sstFileReaderFactory =
                 new SstFileReader.Factory(keyType, valueType, fileFormat, pathFactory);
         this.keyComparator = keyComparator;
-        this.accumulator = accumulator;
+        this.mergeFunction = mergeFunction;
 
         this.keyProjected = false;
     }
@@ -105,7 +105,7 @@ public class FileStoreReadImpl implements FileStoreRead {
                     dropDelete,
                     sstFileReader,
                     keyComparator,
-                    accumulator.copy());
+                    mergeFunction.copy());
         }
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index 441068a..2a47af1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -27,9 +27,9 @@ import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
 import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
 import org.apache.flink.table.store.file.mergetree.SortBufferMemTable;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileReader;
@@ -51,7 +51,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
     private final SstFileReader.Factory sstFileReaderFactory;
     private final SstFileWriter.Factory sstFileWriterFactory;
     private final Comparator<RowData> keyComparator;
-    private final Accumulator accumulator;
+    private final MergeFunction mergeFunction;
     private final FileStorePathFactory pathFactory;
     private final FileStoreScan scan;
     private final MergeTreeOptions options;
@@ -60,7 +60,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
             RowType keyType,
             RowType valueType,
             Comparator<RowData> keyComparator,
-            Accumulator accumulator,
+            MergeFunction mergeFunction,
             FileFormat fileFormat,
             FileStorePathFactory pathFactory,
             FileStoreScan scan,
@@ -71,7 +71,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
                 new SstFileWriter.Factory(
                         keyType, valueType, fileFormat, pathFactory, options.targetFileSize);
         this.keyComparator = keyComparator;
-        this.accumulator = accumulator;
+        this.mergeFunction = mergeFunction;
         this.pathFactory = pathFactory;
         this.scan = scan;
         this.options = options;
@@ -123,7 +123,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
                 new Levels(keyComparator, restoreFiles, options.numLevels),
                 maxSequenceNumber,
                 keyComparator,
-                accumulator.copy(),
+                mergeFunction.copy(),
                 sstFileWriter,
                 options.commitForceCompact);
     }
@@ -147,7 +147,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
                                                 dropDelete,
                                                 sstFileReaderFactory.create(partition, bucket),
                                                 keyComparator,
-                                                accumulator.copy())),
+                                                mergeFunction.copy())),
                                 outputLevel);
         return new CompactManager(
                 compactExecutor, compactStrategy, keyComparator, options.targetFileSize, rewriter);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 70ca005..4fb291c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -29,7 +29,7 @@ import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.operation.FileStoreCommit;
 import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
@@ -82,7 +82,7 @@ public class TestFileStore extends FileStoreImpl {
             RowType partitionType,
             RowType keyType,
             RowType valueType,
-            Accumulator accumulator) {
+            MergeFunction mergeFunction) {
         Configuration conf = new Configuration();
 
         conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, MemorySize.parse("16 kb"));
@@ -98,7 +98,7 @@ public class TestFileStore extends FileStoreImpl {
         conf.set(FileStoreOptions.TABLE_PATH, root);
         conf.set(FileStoreOptions.BUCKET, numBuckets);
 
-        return new TestFileStore(conf, partitionType, keyType, valueType, accumulator);
+        return new TestFileStore(conf, partitionType, keyType, valueType, mergeFunction);
     }
 
     public TestFileStore(
@@ -106,8 +106,8 @@ public class TestFileStore extends FileStoreImpl {
             RowType partitionType,
             RowType keyType,
             RowType valueType,
-            Accumulator accumulator) {
-        super(conf, UUID.randomUUID().toString(), partitionType, keyType, valueType, accumulator);
+            MergeFunction mergeFunction) {
+        super(conf, UUID.randomUUID().toString(), partitionType, keyType, valueType, mergeFunction);
         this.root = conf.getString(FileStoreOptions.TABLE_PATH);
         this.keySerializer = new RowDataSerializer(keyType);
         this.valueSerializer = new RowDataSerializer(valueType);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 18b84cc..e03b6f6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.format.FlushingFileFormat;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
@@ -263,7 +263,7 @@ public class MergeTreeTest {
                 new Levels(comparator, files, options.numLevels),
                 maxSequenceNumber,
                 comparator,
-                new DeduplicateAccumulator(),
+                new DeduplicateMergeFunction(),
                 sstFileWriter,
                 options.commitForceCompact);
     }
@@ -284,7 +284,7 @@ public class MergeTreeTest {
                                                 dropDelete,
                                                 sstFileReader,
                                                 comparator,
-                                                new DeduplicateAccumulator())),
+                                                new DeduplicateMergeFunction())),
                                 outputLevel);
         return new CompactManager(
                 compactExecutor, compactStrategy, comparator, options.targetFileSize, rewriter);
@@ -357,7 +357,7 @@ public class MergeTreeTest {
                         dropDelete,
                         sstFileReader,
                         comparator,
-                        new DeduplicateAccumulator());
+                        new DeduplicateMergeFunction());
         List<TestRecord> records = new ArrayList<>();
         try (RecordReaderIterator iterator = new RecordReaderIterator(reader)) {
             while (iterator.hasNext()) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
index 407e592..c92fff2 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
@@ -20,10 +20,10 @@ package org.apache.flink.table.store.file.mergetree;
 
 import org.apache.flink.table.runtime.generated.RecordComparator;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
-import org.apache.flink.table.store.file.mergetree.compact.AccumulatorTestUtils;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionTestUtils;
+import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
 import org.apache.flink.table.store.file.utils.ReusingKeyValue;
 import org.apache.flink.table.store.file.utils.ReusingTestData;
 import org.apache.flink.table.types.logical.BigIntType;
@@ -61,7 +61,7 @@ public abstract class SortBufferMemTableTestBase {
 
     protected abstract List<ReusingTestData> getExpected(List<ReusingTestData> input);
 
-    protected abstract Accumulator createAccumulator();
+    protected abstract MergeFunction createMergeFunction();
 
     @Test
     public void testAndClear() throws IOException {
@@ -95,7 +95,7 @@ public abstract class SortBufferMemTableTestBase {
     protected void runTest(List<ReusingTestData> input) throws IOException {
         List<ReusingTestData> expected = getExpected(input);
         prepareTable(input);
-        Iterator<KeyValue> actual = table.iterator(KEY_COMPARATOR, createAccumulator());
+        Iterator<KeyValue> actual = table.iterator(KEY_COMPARATOR, createMergeFunction());
 
         Random rnd = new Random();
         for (ReusingTestData data : expected) {
@@ -132,8 +132,8 @@ public abstract class SortBufferMemTableTestBase {
         assertThat(table.size()).isEqualTo(input.size());
     }
 
-    /** Test for {@link SortBufferMemTable} with {@link DeduplicateAccumulator}. */
-    public static class WithDeduplicateAccumulatorTest extends SortBufferMemTableTestBase {
+    /** Test for {@link SortBufferMemTable} with {@link DeduplicateMergeFunction}. */
+    public static class WithDeduplicateMergeFunctionTest extends SortBufferMemTableTestBase {
 
         @Override
         protected boolean addOnly() {
@@ -142,17 +142,17 @@ public abstract class SortBufferMemTableTestBase {
 
         @Override
         protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
-            return AccumulatorTestUtils.getExpectedForDeduplicate(input);
+            return MergeFunctionTestUtils.getExpectedForDeduplicate(input);
         }
 
         @Override
-        protected Accumulator createAccumulator() {
-            return new DeduplicateAccumulator();
+        protected MergeFunction createMergeFunction() {
+            return new DeduplicateMergeFunction();
         }
     }
 
-    /** Test for {@link SortBufferMemTable} with {@link ValueCountAccumulator}. */
-    public static class WithValueCountAccumulatorTest extends SortBufferMemTableTestBase {
+    /** Test for {@link SortBufferMemTable} with {@link ValueCountMergeFunction}. */
+    public static class WithValueCountMergeFunctionTest extends SortBufferMemTableTestBase {
 
         @Override
         protected boolean addOnly() {
@@ -161,12 +161,12 @@ public abstract class SortBufferMemTableTestBase {
 
         @Override
         protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
-            return AccumulatorTestUtils.getExpectedForValueCount(input);
+            return MergeFunctionTestUtils.getExpectedForValueCount(input);
         }
 
         @Override
-        protected Accumulator createAccumulator() {
-            return new ValueCountAccumulator();
+        protected MergeFunction createMergeFunction() {
+            return new ValueCountMergeFunction();
         }
 
         @Test
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/AccumulatorTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
similarity index 95%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/AccumulatorTestUtils.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
index f322ff6..83dc92d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/AccumulatorTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
@@ -26,8 +26,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-/** Test utils for {@link Accumulator}s. */
-public class AccumulatorTestUtils {
+/** Test utils for {@link MergeFunction}s. */
+public class MergeFunctionTestUtils {
 
     public static List<ReusingTestData> getExpectedForValueCount(List<ReusingTestData> input) {
         input = new ArrayList<>(input);
@@ -39,7 +39,7 @@ public class AccumulatorTestUtils {
             ReusingTestData data = input.get(i);
             Preconditions.checkArgument(
                     data.valueKind == ValueKind.ADD,
-                    "Only ADD value kind is supported for value count accumulator.");
+                    "Only ADD value kind is supported for value count merge function.");
             c += data.value;
             if (i + 1 >= input.size() || data.key != input.get(i + 1).key) {
                 if (c != 0) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java
index d77fba0..1894ff3 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java
@@ -31,11 +31,11 @@ import java.util.List;
 /** Tests for {@link SortMergeReader}. */
 public abstract class SortMergeReaderTestBase extends CombiningRecordReaderTestBase {
 
-    protected abstract Accumulator createAccumulator();
+    protected abstract MergeFunction createMergeFunction();
 
     @Override
     protected RecordReader createRecordReader(List<TestReusingRecordReader> readers) {
-        return new SortMergeReader(new ArrayList<>(readers), KEY_COMPARATOR, createAccumulator());
+        return new SortMergeReader(new ArrayList<>(readers), KEY_COMPARATOR, createMergeFunction());
     }
 
     @Test
@@ -70,8 +70,8 @@ public abstract class SortMergeReaderTestBase extends CombiningRecordReaderTestB
                                 + "11, 507, +, 1100 | 12, 508, +, 1200 | 13, 509, +, 1300"));
     }
 
-    /** Tests for {@link SortMergeReader} with {@link DeduplicateAccumulator}. */
-    public static class WithDeduplicateAccumulator extends SortMergeReaderTestBase {
+    /** Tests for {@link SortMergeReader} with {@link DeduplicateMergeFunction}. */
+    public static class WithDeduplicateMergeFunction extends SortMergeReaderTestBase {
 
         @Override
         protected boolean addOnly() {
@@ -80,17 +80,17 @@ public abstract class SortMergeReaderTestBase extends CombiningRecordReaderTestB
 
         @Override
         protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
-            return AccumulatorTestUtils.getExpectedForDeduplicate(input);
+            return MergeFunctionTestUtils.getExpectedForDeduplicate(input);
         }
 
         @Override
-        protected Accumulator createAccumulator() {
-            return new DeduplicateAccumulator();
+        protected MergeFunction createMergeFunction() {
+            return new DeduplicateMergeFunction();
         }
     }
 
-    /** Tests for {@link SortMergeReader} with {@link ValueCountAccumulator}. */
-    public static class WithValueRecordAccumulatorTest extends SortMergeReaderTestBase {
+    /** Tests for {@link SortMergeReader} with {@link ValueCountMergeFunction}. */
+    public static class WithValueRecordMergeFunctionTest extends SortMergeReaderTestBase {
 
         @Override
         protected boolean addOnly() {
@@ -99,12 +99,12 @@ public abstract class SortMergeReaderTestBase extends CombiningRecordReaderTestB
 
         @Override
         protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
-            return AccumulatorTestUtils.getExpectedForValueCount(input);
+            return MergeFunctionTestUtils.getExpectedForValueCount(input);
         }
 
         @Override
-        protected Accumulator createAccumulator() {
-            return new ValueCountAccumulator();
+        protected MergeFunction createMergeFunction() {
+            return new ValueCountMergeFunction();
         }
 
         @Test
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 6a4c04b..f3ca1b0 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 
@@ -247,7 +247,7 @@ public class FileStoreCommitTest {
                 TestKeyValueGenerator.PARTITION_TYPE,
                 TestKeyValueGenerator.KEY_TYPE,
                 TestKeyValueGenerator.ROW_TYPE,
-                new DeduplicateAccumulator());
+                new DeduplicateMergeFunction());
     }
 
     private List<KeyValue> generateDataList(int numRecords) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index bb12a2f..3761b4b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 
 import org.junit.jupiter.api.AfterEach;
@@ -62,7 +62,7 @@ public class FileStoreExpireTest {
                         TestKeyValueGenerator.PARTITION_TYPE,
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.ROW_TYPE,
-                        new DeduplicateAccumulator());
+                        new DeduplicateMergeFunction());
         pathFactory = store.pathFactory();
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
index 43033b5..0163292 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
@@ -27,9 +27,9 @@ import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
-import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.types.logical.BigIntType;
@@ -103,7 +103,7 @@ public class FileStoreReadTest {
         RowDataSerializer valueSerializer = new RowDataSerializer(valueType);
 
         TestFileStore store =
-                createStore(partitionType, keyType, valueType, new ValueCountAccumulator());
+                createStore(partitionType, keyType, valueType, new ValueCountMergeFunction());
         List<KeyValue> readData =
                 writeThenRead(
                         data,
@@ -141,7 +141,7 @@ public class FileStoreReadTest {
                         TestKeyValueGenerator.PARTITION_TYPE,
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.ROW_TYPE,
-                        new DeduplicateAccumulator());
+                        new DeduplicateMergeFunction());
 
         RowDataSerializer projectedValueSerializer =
                 new RowDataSerializer(
@@ -224,8 +224,11 @@ public class FileStoreReadTest {
     }
 
     private TestFileStore createStore(
-            RowType partitionType, RowType keyType, RowType valueType, Accumulator accumulator) {
+            RowType partitionType,
+            RowType keyType,
+            RowType valueType,
+            MergeFunction mergeFunction) {
         return TestFileStore.create(
-                "avro", tempDir.toString(), 1, partitionType, keyType, valueType, accumulator);
+                "avro", tempDir.toString(), 1, partitionType, keyType, valueType, mergeFunction);
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index 4a71b75..a1cbdf5 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.predicate.Equal;
 import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -71,7 +71,7 @@ public class FileStoreScanTest {
                         TestKeyValueGenerator.PARTITION_TYPE,
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.ROW_TYPE,
-                        new DeduplicateAccumulator());
+                        new DeduplicateMergeFunction());
         pathFactory = store.pathFactory();
     }