You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/01 11:28:47 UTC

[flink-table-store] branch master updated: [FLINK-26031] Support projection pushdown on keys and values

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b7efeb  [FLINK-26031] Support projection pushdown on keys and values
6b7efeb is described below

commit 6b7efeb22f0501c6e3859059684b219d69e7d4e6
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Mar 1 19:28:43 2022 +0800

    [FLINK-26031] Support projection pushdown on keys and values
    
    This closes #18
---
 .../apache/flink/table/store/file/FileFormat.java  |  17 +-
 .../flink/table/store/file/FileFormatImpl.java     |  23 ++-
 .../apache/flink/table/store/file/KeyValue.java    |  27 +++
 .../store/file/mergetree/sst/SstFileReader.java    |  49 ++++-
 .../table/store/file/operation/FileStoreRead.java  |  13 +-
 .../store/file/operation/FileStoreReadImpl.java    |  38 +++-
 .../store/file/mergetree/sst/SstFileTest.java      | 211 ++++++++++++-------
 .../store/file/operation/FileStoreReadTest.java    | 230 +++++++++++++++++++++
 .../table/store/file/utils/FlushingAvroFormat.java |   4 +-
 9 files changed, 515 insertions(+), 97 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java
index bec0775..4309284 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java
@@ -37,18 +37,29 @@ import java.util.List;
 public interface FileFormat {
 
     /**
-     * Create a {@link BulkFormat} from the type.
+     * Create a {@link BulkFormat} from the type, with projection pushed down.
      *
+     * @param type Type without projection.
+     * @param projection See {@link org.apache.flink.table.connector.Projection#toNestedIndexes()}.
      * @param filters A list of filters in conjunctive form for filtering on a best-effort basis.
      */
     BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, List<ResolvedExpression> filters);
+            RowType type, int[][] projection, List<ResolvedExpression> filters);
 
     /** Create a {@link BulkWriter.Factory} from the type. */
     BulkWriter.Factory<RowData> createWriterFactory(RowType type);
 
     default BulkFormat<RowData, FileSourceSplit> createReaderFactory(RowType rowType) {
-        return createReaderFactory(rowType, new ArrayList<>());
+        int[][] projection = new int[rowType.getFieldCount()][];
+        for (int i = 0; i < projection.length; i++) {
+            projection[i] = new int[] {i};
+        }
+        return createReaderFactory(rowType, projection);
+    }
+
+    default BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+            RowType rowType, int[][] projection) {
+        return createReaderFactory(rowType, projection, new ArrayList<>());
     }
 
     /** Create a {@link FileFormatImpl} from format identifier and format options. */
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java
index 2008ee2..cdcdd84 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -35,6 +36,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 
@@ -54,15 +56,24 @@ public class FileFormatImpl implements FileFormat {
         this.formatOptions = formatOptions;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType rowType, List<ResolvedExpression> filters) {
-        BulkDecodingFormat<RowData> decodingFormat =
-                FactoryUtil.discoverFactory(
-                                classLoader, BulkReaderFormatFactory.class, formatIdentifier)
-                        .createDecodingFormat(null, formatOptions); // context is useless
+            RowType rowType, int[][] projection, List<ResolvedExpression> filters) {
+        BulkDecodingFormat<RowData> decodingFormat = getDecodingFormat();
+        // TODO use ProjectingBulkFormat if not supported
+        Preconditions.checkState(
+                decodingFormat instanceof ProjectableDecodingFormat,
+                "Format " + formatIdentifier + " does not support projection push down");
         decodingFormat.applyFilters(filters);
-        return decodingFormat.createRuntimeDecoder(SOURCE_CONTEXT, fromLogicalToDataType(rowType));
+        return ((ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>) decodingFormat)
+                .createRuntimeDecoder(SOURCE_CONTEXT, fromLogicalToDataType(rowType), projection);
+    }
+
+    private BulkDecodingFormat<RowData> getDecodingFormat() {
+        return FactoryUtil.discoverFactory(
+                        classLoader, BulkReaderFormatFactory.class, formatIdentifier)
+                .createDecodingFormat(null, formatOptions); // context is useless
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index 8461d64..a470376 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -78,6 +78,33 @@ public class KeyValue {
         return new RowType(fields);
     }
 
+    public static int[][] project(
+            int[][] keyProjection, int[][] valueProjection, int numKeyFields) {
+        int[][] projection = new int[keyProjection.length + 2 + valueProjection.length][];
+
+        // key
+        for (int i = 0; i < keyProjection.length; i++) {
+            projection[i] = new int[keyProjection[i].length];
+            System.arraycopy(keyProjection[i], 0, projection[i], 0, keyProjection[i].length);
+        }
+
+        // seq
+        projection[keyProjection.length] = new int[] {numKeyFields};
+
+        // value kind
+        projection[keyProjection.length + 1] = new int[] {numKeyFields + 1};
+
+        // value
+        for (int i = 0; i < valueProjection.length; i++) {
+            int idx = keyProjection.length + 2 + i;
+            projection[idx] = new int[valueProjection[i].length];
+            System.arraycopy(valueProjection[i], 0, projection[idx], 0, valueProjection[i].length);
+            projection[idx][0] += numKeyFields + 2;
+        }
+
+        return projection;
+    }
+
     @VisibleForTesting
     public KeyValue copy(RowDataSerializer keySerializer, RowDataSerializer valueSerializer) {
         return new KeyValue()
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
index fa54477..f9ce7f3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
@@ -22,6 +22,7 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.src.util.RecordAndPosition;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileFormat;
@@ -36,7 +37,12 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 
-/** Reads {@link KeyValue}s from sst files. */
+/**
+ * Reads {@link KeyValue}s from sst files.
+ *
+ * <p>NOTE: Sst files store records ordered by keys without projections. If key projections are
+ * applied the produced iterator is no longer ordered.
+ */
 public class SstFileReader {
 
     private final RowType keyType;
@@ -112,8 +118,14 @@ public class SstFileReader {
 
         private final RowType keyType;
         private final RowType valueType;
+        private final FileFormat fileFormat;
         private final FileStorePathFactory pathFactory;
-        private final BulkFormat<RowData, FileSourceSplit> readerFactory;
+
+        private int[][] keyProjection;
+        private int[][] valueProjection;
+        private RowType projectedKeyType;
+        private RowType projectedValueType;
+        private BulkFormat<RowData, FileSourceSplit> readerFactory;
 
         public Factory(
                 RowType keyType,
@@ -122,17 +134,42 @@ public class SstFileReader {
                 FileStorePathFactory pathFactory) {
             this.keyType = keyType;
             this.valueType = valueType;
+            this.fileFormat = fileFormat;
             this.pathFactory = pathFactory;
-            RowType recordType = KeyValue.schema(keyType, valueType);
-            this.readerFactory = fileFormat.createReaderFactory(recordType);
+
+            this.keyProjection = Projection.range(0, keyType.getFieldCount()).toNestedIndexes();
+            this.valueProjection = Projection.range(0, valueType.getFieldCount()).toNestedIndexes();
+            applyProjection();
+        }
+
+        public Factory withKeyProjection(int[][] projection) {
+            keyProjection = projection;
+            applyProjection();
+            return this;
+        }
+
+        public Factory withValueProjection(int[][] projection) {
+            valueProjection = projection;
+            applyProjection();
+            return this;
         }
 
         public SstFileReader create(BinaryRowData partition, int bucket) {
             return new SstFileReader(
-                    keyType,
-                    valueType,
+                    projectedKeyType,
+                    projectedValueType,
                     readerFactory,
                     pathFactory.createSstPathFactory(partition, bucket));
         }
+
+        private void applyProjection() {
+            projectedKeyType = (RowType) Projection.of(keyProjection).project(keyType);
+            projectedValueType = (RowType) Projection.of(valueProjection).project(valueType);
+
+            RowType recordType = KeyValue.schema(keyType, valueType);
+            int[][] projection =
+                    KeyValue.project(keyProjection, valueProjection, keyType.getFieldCount());
+            readerFactory = fileFormat.createReaderFactory(recordType, projection);
+        }
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
index 33a6233..cbf95e4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
@@ -34,7 +34,18 @@ public interface FileStoreRead {
     /** With value nested projection. */
     void withValueProjection(int[][] projectedFields);
 
-    /** Create a {@link RecordReader} from partition and bucket and files. */
+    /**
+     * Create a {@link RecordReader} from partition and bucket and files.
+     *
+     * <p>The resulting reader has the following characteristics:
+     *
+     * <ul>
+     *   <li>If {@link FileStoreRead#withKeyProjection} is called, key-values produced by this
+     *       reader may be unordered and may contain duplicated keys.
+     *   <li>If {@link FileStoreRead#withKeyProjection} is not called, key-values produced by this
+     *       reader is guaranteed to be ordered by keys and does not contain duplicated keys.
+     * </ul>
+     */
     RecordReader createReader(BinaryRowData partition, int bucket, List<SstFileMeta> files)
             throws IOException;
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
index 6c5383b..0df5aa7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
 import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileReader;
@@ -31,6 +32,7 @@ import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 
@@ -41,6 +43,8 @@ public class FileStoreReadImpl implements FileStoreRead {
     private final Comparator<RowData> keyComparator;
     private final Accumulator accumulator;
 
+    private boolean keyProjected;
+
     public FileStoreReadImpl(
             RowType keyType,
             RowType valueType,
@@ -52,28 +56,42 @@ public class FileStoreReadImpl implements FileStoreRead {
                 new SstFileReader.Factory(keyType, valueType, fileFormat, pathFactory);
         this.keyComparator = keyComparator;
         this.accumulator = accumulator;
+
+        this.keyProjected = false;
     }
 
     @Override
     public void withKeyProjection(int[][] projectedFields) {
-        // TODO
-        throw new UnsupportedOperationException();
+        sstFileReaderFactory.withKeyProjection(projectedFields);
+        keyProjected = true;
     }
 
     @Override
     public void withValueProjection(int[][] projectedFields) {
-        // TODO
-        throw new UnsupportedOperationException();
+        sstFileReaderFactory.withValueProjection(projectedFields);
     }
 
     @Override
     public RecordReader createReader(BinaryRowData partition, int bucket, List<SstFileMeta> files)
             throws IOException {
-        return new MergeTreeReader(
-                new IntervalPartition(files, keyComparator).partition(),
-                true,
-                sstFileReaderFactory.create(partition, bucket),
-                keyComparator,
-                accumulator.copy());
+        SstFileReader sstFileReader = sstFileReaderFactory.create(partition, bucket);
+        if (keyProjected) {
+            // key projection has been applied, so sst readers will not return key-values in order,
+            // we have to return the raw file contents without merging
+            List<ConcatRecordReader.ReaderSupplier> suppliers = new ArrayList<>();
+            for (SstFileMeta file : files) {
+                suppliers.add(() -> sstFileReader.read(file.fileName()));
+            }
+            return ConcatRecordReader.create(suppliers);
+        } else {
+            // key projection is not applied, so sst readers will return key-values in order,
+            // in this case merge tree can merge records with same key for us
+            return new MergeTreeReader(
+                    new IntervalPartition(files, keyComparator).partition(),
+                    true,
+                    sstFileReader,
+                    keyComparator,
+                    accumulator.copy());
+        }
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
index b83efb4..e5bde46 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -18,16 +18,12 @@
 
 package org.apache.flink.table.store.file.mergetree.sst;
 
-import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializerTest;
@@ -35,17 +31,24 @@ import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.stats.FieldStats;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FlushingAvroFormat;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.util.CloseableIterator;
 
 import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -71,29 +74,15 @@ public class SstFileTest {
 
         checkRollingFiles(data.meta, actualMetas, writer.suggestedFileSize());
 
-        SstFileReader reader = createSstFileReader(tempDir.toString());
-        Iterator<KeyValue> expectedIterator = data.content.iterator();
-        for (SstFileMeta meta : actualMetas) {
-            // check the contents of sst file
-            CloseableIterator<KeyValue> actualKvsIterator =
-                    new RecordReaderIterator(reader.read(meta.fileName()));
-            while (actualKvsIterator.hasNext()) {
-                assertThat(expectedIterator.hasNext()).isTrue();
-                KeyValue actualKv = actualKvsIterator.next();
-                assertThat(
-                                KeyValueSerializerTest.equals(
-                                        expectedIterator.next(),
-                                        actualKv,
-                                        TestKeyValueGenerator.KEY_SERIALIZER,
-                                        TestKeyValueGenerator.ROW_SERIALIZER))
-                        .isTrue();
-            }
-            actualKvsIterator.close();
-
-            // check that each sst file meta is serializable
-            assertThat(serializer.fromRow(serializer.toRow(meta))).isEqualTo(meta);
-        }
-        assertThat(expectedIterator.hasNext()).isFalse();
+        SstFileReader reader = createSstFileReader(tempDir.toString(), null, null);
+        assertData(
+                data,
+                actualMetas,
+                TestKeyValueGenerator.KEY_SERIALIZER,
+                TestKeyValueGenerator.ROW_SERIALIZER,
+                serializer,
+                reader,
+                kv -> kv);
     }
 
     @RepeatedTest(10)
@@ -118,6 +107,88 @@ public class SstFileTest {
         }
     }
 
+    @Test
+    public void testKeyProjection() throws Exception {
+        SstTestDataGenerator.Data data = gen.next();
+        SstFileWriter sstFileWriter = createSstFileWriter(tempDir.toString());
+        SstFileMetaSerializer serializer =
+                new SstFileMetaSerializer(
+                        TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);
+        List<SstFileMeta> actualMetas =
+                sstFileWriter.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
+
+        // projection: (shopId, orderId) -> (orderId)
+        SstFileReader sstFileReader =
+                createSstFileReader(tempDir.toString(), new int[][] {new int[] {1}}, null);
+        RowType projectedKeyType =
+                RowType.of(new LogicalType[] {new BigIntType(false)}, new String[] {"key_orderId"});
+        RowDataSerializer projectedKeySerializer = new RowDataSerializer(projectedKeyType);
+        assertData(
+                data,
+                actualMetas,
+                projectedKeySerializer,
+                TestKeyValueGenerator.ROW_SERIALIZER,
+                serializer,
+                sstFileReader,
+                kv ->
+                        new KeyValue()
+                                .replace(
+                                        GenericRowData.of(kv.key().getLong(1)),
+                                        kv.sequenceNumber(),
+                                        kv.valueKind(),
+                                        kv.value()));
+    }
+
+    @Test
+    public void testValueProjection() throws Exception {
+        SstTestDataGenerator.Data data = gen.next();
+        SstFileWriter sstFileWriter = createSstFileWriter(tempDir.toString());
+        SstFileMetaSerializer serializer =
+                new SstFileMetaSerializer(
+                        TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);
+        List<SstFileMeta> actualMetas =
+                sstFileWriter.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
+
+        // projection:
+        // (dt, hr, shopId, orderId, itemId, priceAmount, comment) ->
+        // (shopId, itemId, dt, hr)
+        SstFileReader sstFileReader =
+                createSstFileReader(
+                        tempDir.toString(),
+                        null,
+                        new int[][] {new int[] {2}, new int[] {4}, new int[] {0}, new int[] {1}});
+        RowType projectedValueType =
+                RowType.of(
+                        new LogicalType[] {
+                            new IntType(false),
+                            new BigIntType(),
+                            new VarCharType(false, 8),
+                            new IntType(false)
+                        },
+                        new String[] {"shopId", "itemId", "dt", "hr"});
+        RowDataSerializer projectedValueSerializer = new RowDataSerializer(projectedValueType);
+        assertData(
+                data,
+                actualMetas,
+                TestKeyValueGenerator.KEY_SERIALIZER,
+                projectedValueSerializer,
+                serializer,
+                sstFileReader,
+                kv ->
+                        new KeyValue()
+                                .replace(
+                                        kv.key(),
+                                        kv.sequenceNumber(),
+                                        kv.valueKind(),
+                                        GenericRowData.of(
+                                                kv.value().getInt(2),
+                                                kv.value().isNullAt(4)
+                                                        ? null
+                                                        : kv.value().getLong(4),
+                                                kv.value().getString(0),
+                                                kv.value().getInt(1))));
+    }
+
     private SstFileWriter createSstFileWriter(String path) {
         FileStorePathFactory pathFactory = new FileStorePathFactory(new Path(path));
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
@@ -133,7 +204,8 @@ public class SstFileTest {
                 .create(BinaryRowDataUtil.EMPTY_ROW, 0);
     }
 
-    private SstFileReader createSstFileReader(String path) {
+    private SstFileReader createSstFileReader(
+            String path, int[][] keyProjection, int[][] valueProjection) {
         FileStorePathFactory pathFactory = new FileStorePathFactory(new Path(path));
         SstFileReader.Factory factory =
                 new SstFileReader.Factory(
@@ -141,9 +213,49 @@ public class SstFileTest {
                         TestKeyValueGenerator.ROW_TYPE,
                         flushingAvro,
                         pathFactory);
+        if (keyProjection != null) {
+            factory.withKeyProjection(keyProjection);
+        }
+        if (valueProjection != null) {
+            factory.withValueProjection(valueProjection);
+        }
         return factory.create(BinaryRowDataUtil.EMPTY_ROW, 0);
     }
 
+    private void assertData(
+            SstTestDataGenerator.Data data,
+            List<SstFileMeta> actualMetas,
+            RowDataSerializer keySerializer,
+            RowDataSerializer projectedValueSerializer,
+            SstFileMetaSerializer sstFileMetaSerializer,
+            SstFileReader sstFileReader,
+            Function<KeyValue, KeyValue> toExpectedKv)
+            throws Exception {
+        Iterator<KeyValue> expectedIterator = data.content.iterator();
+        for (SstFileMeta meta : actualMetas) {
+            // check the contents of sst file
+            CloseableIterator<KeyValue> actualKvsIterator =
+                    new RecordReaderIterator(sstFileReader.read(meta.fileName()));
+            while (actualKvsIterator.hasNext()) {
+                assertThat(expectedIterator.hasNext()).isTrue();
+                KeyValue actualKv = actualKvsIterator.next();
+                assertThat(
+                                KeyValueSerializerTest.equals(
+                                        toExpectedKv.apply(expectedIterator.next()),
+                                        actualKv,
+                                        keySerializer,
+                                        projectedValueSerializer))
+                        .isTrue();
+            }
+            actualKvsIterator.close();
+
+            // check that each sst file meta is serializable
+            assertThat(sstFileMetaSerializer.fromRow(sstFileMetaSerializer.toRow(meta)))
+                    .isEqualTo(meta);
+        }
+        assertThat(expectedIterator.hasNext()).isFalse();
+    }
+
     private void checkRollingFiles(
             SstFileMeta expected, List<SstFileMeta> actual, long suggestedFileSize) {
         // all but last file should be no smaller than suggestedFileSize
@@ -216,43 +328,4 @@ public class SstFileTest {
         assertThat(actual.stream().mapToLong(FieldStats::nullCount).sum())
                 .isEqualTo(expected.nullCount());
     }
-
-    /** A special avro {@link FileFormat} which flushes for every added element. */
-    public static class FlushingAvroFormat implements FileFormat {
-
-        private final FileFormat avro =
-                FileFormat.fromIdentifier(
-                        SstFileTest.class.getClassLoader(), "avro", new Configuration());
-
-        @Override
-        public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-                RowType type, List<ResolvedExpression> filters) {
-            return avro.createReaderFactory(type, filters);
-        }
-
-        @Override
-        public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
-            return fsDataOutputStream -> {
-                BulkWriter<RowData> wrapped =
-                        avro.createWriterFactory(type).create(fsDataOutputStream);
-                return new BulkWriter<RowData>() {
-                    @Override
-                    public void addElement(RowData rowData) throws IOException {
-                        wrapped.addElement(rowData);
-                        wrapped.flush();
-                    }
-
-                    @Override
-                    public void flush() throws IOException {
-                        wrapped.flush();
-                    }
-
-                    @Override
-                    public void finish() throws IOException {
-                        wrapped.finish();
-                    }
-                };
-            };
-        }
-    }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
new file mode 100644
index 0000000..406a214
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestFileStore;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.ValueCountAccumulator;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileStoreReadImpl}. */
+public class FileStoreReadTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @BeforeEach
+    public void beforeEach() throws IOException {
+        Path root = new Path(tempDir.toString());
+        root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+    }
+
+    @Test
+    public void testKeyProjection() throws Exception {
+        // (a, b, c) -> (b, a), c is the partition, all integers are in range [0, 2]
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int numRecords = random.nextInt(1000) + 1;
+        List<KeyValue> data = new ArrayList<>();
+        Map<Integer, Long> expected = new HashMap<>();
+        for (int i = 0; i < numRecords; i++) {
+            int a = random.nextInt(3);
+            int b = random.nextInt(3);
+            int c = random.nextInt(3);
+            long delta = random.nextLong(21) - 10;
+            expected.compute(b * 10 + a, (k, v) -> v == null ? delta : v + delta);
+            data.add(
+                    new KeyValue()
+                            .replace(
+                                    GenericRowData.of(a, b, c),
+                                    i,
+                                    ValueKind.ADD,
+                                    GenericRowData.of(delta)));
+        }
+
+        RowType partitionType =
+                RowType.of(new LogicalType[] {new IntType(false)}, new String[] {"c"});
+        RowDataSerializer partitionSerializer = new RowDataSerializer(partitionType);
+        RowType keyType =
+                RowType.of(
+                        new LogicalType[] {
+                            new IntType(false), new IntType(false), new IntType(false)
+                        },
+                        new String[] {"a", "b", "c"});
+        RowType projectedKeyType = RowType.of(new IntType(false), new IntType(false));
+        RowDataSerializer projectedKeySerializer = new RowDataSerializer(projectedKeyType);
+        RowType valueType =
+                RowType.of(new LogicalType[] {new BigIntType(false)}, new String[] {"count"});
+        RowDataSerializer valueSerializer = new RowDataSerializer(valueType);
+
+        TestFileStore store =
+                createStore(partitionType, keyType, valueType, new ValueCountAccumulator());
+        List<KeyValue> readData =
+                writeThenRead(
+                        data,
+                        new int[][] {new int[] {1}, new int[] {0}},
+                        null,
+                        projectedKeySerializer,
+                        valueSerializer,
+                        store,
+                        kv ->
+                                partitionSerializer
+                                        .toBinaryRow(GenericRowData.of(kv.key().getInt(2)))
+                                        .copy());
+        Map<Integer, Long> actual = new HashMap<>();
+        for (KeyValue kv : readData) {
+            assertThat(kv.key().getArity()).isEqualTo(2);
+            int key = kv.key().getInt(0) * 10 + kv.key().getInt(1);
+            long delta = kv.value().getLong(0);
+            actual.compute(key, (k, v) -> v == null ? delta : v + delta);
+        }
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testValueProjection() throws Exception {
+        // (dt, hr, shopId, orderId, itemId, priceAmount, comment) -> (shopId, itemId, dt, hr)
+
+        TestKeyValueGenerator gen = new TestKeyValueGenerator();
+        int numRecords = ThreadLocalRandom.current().nextInt(1000) + 1;
+        List<KeyValue> data = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            data.add(gen.next());
+        }
+        TestFileStore store =
+                createStore(
+                        TestKeyValueGenerator.PARTITION_TYPE,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.ROW_TYPE,
+                        new DeduplicateAccumulator());
+
+        RowDataSerializer projectedValueSerializer =
+                new RowDataSerializer(
+                        new IntType(false),
+                        new BigIntType(),
+                        new VarCharType(false, 8),
+                        new IntType(false));
+        Map<BinaryRowData, BinaryRowData> expected = store.toKvMap(data);
+        expected.replaceAll(
+                (k, v) ->
+                        projectedValueSerializer
+                                .toBinaryRow(
+                                        GenericRowData.of(
+                                                v.getInt(2),
+                                                v.isNullAt(4) ? null : v.getLong(4),
+                                                v.getString(0),
+                                                v.getInt(1)))
+                                .copy());
+
+        List<KeyValue> readData =
+                writeThenRead(
+                        data,
+                        null,
+                        new int[][] {new int[] {2}, new int[] {4}, new int[] {0}, new int[] {1}},
+                        TestKeyValueGenerator.KEY_SERIALIZER,
+                        projectedValueSerializer,
+                        store,
+                        gen::getPartition);
+        for (KeyValue kv : readData) {
+            assertThat(kv.value().getArity()).isEqualTo(4);
+            BinaryRowData key = TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(kv.key());
+            BinaryRowData value = projectedValueSerializer.toBinaryRow(kv.value());
+            assertThat(expected).containsKey(key);
+            assertThat(value).isEqualTo(expected.get(key));
+        }
+    }
+
+    private List<KeyValue> writeThenRead(
+            List<KeyValue> data,
+            int[][] keyProjection,
+            int[][] valueProjection,
+            RowDataSerializer projectedKeySerializer,
+            RowDataSerializer projectedValueSerializer,
+            TestFileStore store,
+            Function<KeyValue, BinaryRowData> partitionCalculator)
+            throws Exception {
+        store.commitData(data, partitionCalculator, kv -> 0);
+        FileStoreScan scan = store.newScan();
+        Map<BinaryRowData, List<ManifestEntry>> filesGroupedByPartition =
+                scan.withSnapshot(store.pathFactory().latestSnapshotId()).plan().files().stream()
+                        .collect(Collectors.groupingBy(ManifestEntry::partition));
+        FileStoreRead read = store.newRead();
+        if (keyProjection != null) {
+            read.withKeyProjection(keyProjection);
+        }
+        if (valueProjection != null) {
+            read.withValueProjection(valueProjection);
+        }
+
+        List<KeyValue> result = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, List<ManifestEntry>> entry :
+                filesGroupedByPartition.entrySet()) {
+            RecordReader reader =
+                    read.createReader(
+                            entry.getKey(),
+                            0,
+                            entry.getValue().stream()
+                                    .map(ManifestEntry::file)
+                                    .collect(Collectors.toList()));
+            RecordReaderIterator actualIterator = new RecordReaderIterator(reader);
+            while (actualIterator.hasNext()) {
+                result.add(
+                        actualIterator
+                                .next()
+                                .copy(projectedKeySerializer, projectedValueSerializer));
+            }
+        }
+        return result;
+    }
+
+    private TestFileStore createStore(
+            RowType partitionType, RowType keyType, RowType valueType, Accumulator accumulator) {
+        return TestFileStore.create(
+                "avro", tempDir.toString(), 1, partitionType, keyType, valueType, accumulator);
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
index 5279b37..a6ff06b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
@@ -40,8 +40,8 @@ public class FlushingAvroFormat implements FileFormat {
 
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
-            RowType type, List<ResolvedExpression> filters) {
-        return avro.createReaderFactory(type, filters);
+            RowType type, int[][] projection, List<ResolvedExpression> filters) {
+        return avro.createReaderFactory(type, projection, filters);
     }
 
     @Override