You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/23 12:34:30 UTC

[flink-table-store] branch master updated: [FLINK-26791] Finish projection push down for table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f223c2  [FLINK-26791] Finish projection push down for table store
8f223c2 is described below

commit 8f223c20ddd2d9293a3c64271a8451c1db265635
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Mar 23 20:34:26 2022 +0800

    [FLINK-26791] Finish projection push down for table store
    
    This closes #57
---
 .../flink/table/store/connector/TableStore.java    | 12 +++-
 .../store/connector/source/FileStoreSource.java    | 13 +++--
 .../connector/source/FileStoreSourceReader.java    |  9 ++-
 .../source/FileStoreSourceSplitReader.java         | 27 ++++++++-
 .../store/connector/source/TableStoreSource.java   |  3 +-
 .../table/store/connector/FileStoreITCase.java     | 57 ++++++++++++++++++-
 .../store/connector/sink/LogStoreSinkITCase.java   |  2 +-
 .../source/FileStoreSourceReaderTest.java          |  2 +-
 .../table/store/log/LogStoreTableFactory.java      |  5 +-
 ...ema.java => KafkaLogDeserializationSchema.java} | 66 ++++++++++++++++++----
 .../table/store/kafka/KafkaLogSourceProvider.java  | 16 ++++--
 .../table/store/kafka/KafkaLogStoreFactory.java    |  7 ++-
 .../flink/table/store/kafka/KafkaLogITCase.java    | 57 +++++++++++++------
 .../store/kafka/KafkaLogSerializationTest.java     |  4 +-
 14 files changed, 230 insertions(+), 50 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index a21ebfe..051fd0e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.catalog.CatalogLock;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.store.connector.sink.BucketStreamPartitioner;
@@ -52,6 +53,7 @@ import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.store.log.LogSourceProvider;
 import org.apache.flink.table.store.utils.TypeUtils;
 import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
@@ -60,6 +62,7 @@ import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -286,8 +289,6 @@ public class TableStore {
                     return buildFileSource(true);
                 }
 
-                // TODO project log source
-
                 if (isHybrid) {
                     return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
                                     buildFileSource(false))
@@ -304,11 +305,16 @@ public class TableStore {
         }
 
         public DataStreamSource<RowData> build(StreamExecutionEnvironment env) {
+            LogicalType produceType =
+                    Optional.ofNullable(projectedFields)
+                            .map(Projection::of)
+                            .map(p -> p.project(type))
+                            .orElse(type);
             return env.fromSource(
                     build(),
                     WatermarkStrategy.noWatermarks(),
                     tableIdentifier.asSummaryString(),
-                    InternalTypeInfo.of(type));
+                    InternalTypeInfo.of(produceType));
         }
     }
 
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index ba38ae3..93faca2 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -93,17 +93,22 @@ public class FileStoreSource
             read.withDropDelete(false);
         }
 
+        int[][] valueCountModeProjects = null;
         if (projectedFields != null) {
             if (valueCountMode) {
-                // TODO when isContinuous is false, don't project keys, and add key projection to
-                // split reader
-                read.withKeyProjection(projectedFields);
+                // push projection to file store for better performance under continuous read mode,
+                // because the merge cannot be performed anyway
+                if (isContinuous) {
+                    read.withKeyProjection(projectedFields);
+                } else {
+                    valueCountModeProjects = projectedFields;
+                }
             } else {
                 read.withValueProjection(projectedFields);
             }
         }
 
-        return new FileStoreSourceReader(context, read, valueCountMode);
+        return new FileStoreSourceReader(context, read, valueCountMode, valueCountModeProjects);
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
index 66ebc46..6a5a263 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
@@ -25,6 +25,8 @@ import org.apache.flink.connector.files.shaded.org.apache.flink.connector.base.s
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 
+import javax.annotation.Nullable;
+
 import java.util.Map;
 
 /** A {@link SourceReader} that read records from {@link FileStoreSourceSplit}. */
@@ -38,9 +40,12 @@ public final class FileStoreSourceReader
     public FileStoreSourceReader(
             SourceReaderContext readerContext,
             FileStoreRead fileStoreRead,
-            boolean valueCountMode) {
+            boolean valueCountMode,
+            @Nullable int[][] valueCountModeProjects) {
         super(
-                () -> new FileStoreSourceSplitReader(fileStoreRead, valueCountMode),
+                () ->
+                        new FileStoreSourceSplitReader(
+                                fileStoreRead, valueCountMode, valueCountModeProjects),
                 (element, output, splitState) -> {
                     output.collect(element.getRecord());
                     splitState.setPosition(element);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
index 728e1a0..82931d3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.connector.source;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.file.src.impl.FileRecords;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
@@ -28,6 +29,7 @@ import org.apache.flink.connector.files.shaded.org.apache.flink.connector.base.s
 import org.apache.flink.connector.files.shaded.org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.files.shaded.org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
@@ -38,6 +40,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.LinkedList;
+import java.util.Optional;
 import java.util.Queue;
 
 /** The {@link SplitReader} implementation for the file store source. */
@@ -46,6 +49,7 @@ public class FileStoreSourceSplitReader
 
     private final FileStoreRead fileStoreRead;
     private final boolean valueCountMode;
+    @Nullable private final int[][] valueCountModeProjects;
 
     private final Queue<FileStoreSourceSplit> splits;
 
@@ -56,9 +60,18 @@ public class FileStoreSourceSplitReader
     private long currentNumRead;
     private RecordReader.RecordIterator currentFirstBatch;
 
+    @VisibleForTesting
     public FileStoreSourceSplitReader(FileStoreRead fileStoreRead, boolean valueCountMode) {
+        this(fileStoreRead, valueCountMode, null);
+    }
+
+    public FileStoreSourceSplitReader(
+            FileStoreRead fileStoreRead,
+            boolean valueCountMode,
+            @Nullable int[][] valueCountModeProjects) {
         this.fileStoreRead = fileStoreRead;
         this.valueCountMode = valueCountMode;
+        this.valueCountModeProjects = valueCountModeProjects;
         this.splits = new LinkedList<>();
         this.pool = new Pool<>(1);
         this.pool.add(
@@ -221,6 +234,12 @@ public class FileStoreSourceSplitReader
         private long count = 0;
 
         @Nullable
+        private final ProjectedRowData projectedRow =
+                Optional.ofNullable(valueCountModeProjects)
+                        .map(ProjectedRowData::from)
+                        .orElse(null);
+
+        @Nullable
         @Override
         public RecordAndPosition<RowData> next() {
             try {
@@ -240,8 +259,9 @@ public class FileStoreSourceSplitReader
                     if (value < 0) {
                         row.setRowKind(RowKind.DELETE);
                     }
-                    recordAndPosition.setNext(row);
+                    setNext(row);
                 } else {
+                    // move forward recordSkipCount
                     recordAndPosition.setNext(recordAndPosition.getRecord());
                 }
                 count--;
@@ -251,5 +271,10 @@ public class FileStoreSourceSplitReader
                 throw new RuntimeException(e);
             }
         }
+
+        private void setNext(RowData row) {
+            row = projectedRow == null ? row : projectedRow.replaceRow(row);
+            recordAndPosition.setNext(row);
+        }
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 171ccb5..11da93e 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -128,7 +128,8 @@ public class TableStoreSource
                                     return scanContext.createDataStructureConverter(
                                             producedDataType);
                                 }
-                            });
+                            },
+                            projectFields);
         }
         TableStore.SourceBuilder builder =
                 tableStore
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 95e1f9b..ed4c493 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -213,6 +214,54 @@ public class FileStoreITCase extends AbstractTestBase {
     }
 
     @Test
+    public void testKeyedProjection() throws Exception {
+        testProjection();
+    }
+
+    @Test
+    public void testNonKeyedProjection() throws Exception {
+        store.withPrimaryKeys(new int[0]);
+        testProjection();
+    }
+
+    private void testProjection() throws Exception {
+        // write
+        store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
+        env.execute();
+
+        // read
+        Projection projection = Projection.of(new int[] {1, 2});
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        DataStructureConverter<RowData, Row> converter =
+                (DataStructureConverter)
+                        DataStructureConverters.getConverter(
+                                TypeConversions.fromLogicalToDataType(
+                                        projection.project(TABLE_TYPE)));
+        List<Row> results =
+                executeAndCollect(
+                        store.sourceBuilder()
+                                .withProjection(projection.toNestedIndexes())
+                                .build(env),
+                        converter);
+
+        // assert
+        Row[] expected = new Row[] {Row.of("p2", 1), Row.of("p1", 2), Row.of("p2", 5)};
+        if (store.primaryKeys().isEmpty()) {
+            // in streaming mode, expect origin data X 2 (FiniteTestSource)
+            Stream<RowData> expectedStream =
+                    isBatch
+                            ? SOURCE_DATA.stream()
+                            : Stream.concat(SOURCE_DATA.stream(), SOURCE_DATA.stream());
+            expected =
+                    expectedStream
+                            .map(CONVERTER::toExternal)
+                            .map(r -> Row.of(r.getField(1), r.getField(2)))
+                            .toArray(Row[]::new);
+        }
+        assertThat(results).containsExactlyInAnyOrder(expected);
+    }
+
+    @Test
     public void testContinuous() throws Exception {
         store.withPrimaryKeys(new int[] {2});
         innerTestContinuous();
@@ -330,10 +379,16 @@ public class FileStoreITCase extends AbstractTestBase {
     }
 
     public static List<Row> executeAndCollect(DataStreamSource<RowData> source) throws Exception {
+        return executeAndCollect(source, CONVERTER);
+    }
+
+    public static List<Row> executeAndCollect(
+            DataStreamSource<RowData> source, DataStructureConverter<RowData, Row> converter)
+            throws Exception {
         CloseableIterator<RowData> iterator = source.executeAndCollect();
         List<Row> results = new ArrayList<>();
         while (iterator.hasNext()) {
-            results.add(CONVERTER.toExternal(iterator.next()));
+            results.add(converter.toExternal(iterator.next()));
         }
         iterator.close();
         return results;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index d12848f..15e7be3 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -121,7 +121,7 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
         KafkaLogStoreFactory factory = discoverKafkaLogFactory();
         KafkaLogSinkProvider sinkProvider = factory.createSinkProvider(context, SINK_CONTEXT);
         KafkaLogSourceProvider sourceProvider =
-                factory.createSourceProvider(context, SOURCE_CONTEXT);
+                factory.createSourceProvider(context, SOURCE_CONTEXT, null);
 
         factory.onCreateTable(context, 3, true);
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
index d33afe1..d795ade 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
@@ -58,7 +58,7 @@ public class FileStoreSourceReaderTest {
 
     private FileStoreSourceReader createReader(TestingReaderContext context) {
         return new FileStoreSourceReader(
-                context, new TestDataReadWrite(tempDir.toString(), null).createRead(), false);
+                context, new TestDataReadWrite(tempDir.toString(), null).createRead(), false, null);
     }
 
     private static FileStoreSourceSplit createTestFileSplit() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
index 2300fcf..5f28a3d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -36,6 +36,8 @@ import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 import org.apache.flink.table.factories.SerializationFormatFactory;
 import org.apache.flink.types.RowKind;
 
+import javax.annotation.Nullable;
+
 /**
  * Base interface for configuring a default log table connector. The log table is used by managed
  * table factory.
@@ -55,7 +57,8 @@ public interface LogStoreTableFactory extends DynamicTableFactory {
      * Creates a {@link LogSourceProvider} instance from a {@link CatalogTable} and additional
      * context information.
      */
-    LogSourceProvider createSourceProvider(Context context, SourceContext sourceContext);
+    LogSourceProvider createSourceProvider(
+            Context context, SourceContext sourceContext, @Nullable int[][] projectFields);
 
     /**
      * Creates a {@link LogSinkProvider} instance from a {@link CatalogTable} and additional context
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogKeyedDeserializationSchema.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
similarity index 62%
rename from flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogKeyedDeserializationSchema.java
rename to flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
index 50485c6..e17699f 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogKeyedDeserializationSchema.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
@@ -21,8 +21,10 @@ package org.apache.flink.table.store.kafka;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
@@ -30,30 +32,41 @@ import org.apache.flink.util.Collector;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
+import javax.annotation.Nullable;
+
 import java.util.stream.IntStream;
 
 /** A {@link KafkaDeserializationSchema} for the table with primary key in log store. */
-public class KafkaLogKeyedDeserializationSchema implements KafkaDeserializationSchema<RowData> {
+public class KafkaLogDeserializationSchema implements KafkaDeserializationSchema<RowData> {
 
     private static final long serialVersionUID = 1L;
 
     private final TypeInformation<RowData> producedType;
     private final int fieldCount;
     private final int[] primaryKey;
-    private final DeserializationSchema<RowData> keyDeserializer;
+    @Nullable private final DeserializationSchema<RowData> keyDeserializer;
     private final DeserializationSchema<RowData> valueDeserializer;
     private final RowData.FieldGetter[] keyFieldGetters;
+    @Nullable private final int[][] projectFields;
+
+    private transient ProjectCollector projectCollector;
 
-    public KafkaLogKeyedDeserializationSchema(
+    public KafkaLogDeserializationSchema(
             DataType physicalType,
             int[] primaryKey,
-            DeserializationSchema<RowData> keyDeserializer,
-            DeserializationSchema<RowData> valueDeserializer) {
+            @Nullable DeserializationSchema<RowData> keyDeserializer,
+            DeserializationSchema<RowData> valueDeserializer,
+            @Nullable int[][] projectFields) {
         this.primaryKey = primaryKey;
         this.keyDeserializer = keyDeserializer;
         this.valueDeserializer = valueDeserializer;
-        this.producedType = InternalTypeInfo.of(physicalType.getLogicalType());
+        DataType projectedType =
+                projectFields == null
+                        ? physicalType
+                        : Projection.of(projectFields).project(physicalType);
+        this.producedType = InternalTypeInfo.of(projectedType.getLogicalType());
         this.fieldCount = physicalType.getChildren().size();
+        this.projectFields = projectFields;
         this.keyFieldGetters =
                 IntStream.range(0, primaryKey.length)
                         .mapToObj(
@@ -69,8 +82,11 @@ public class KafkaLogKeyedDeserializationSchema implements KafkaDeserializationS
 
     @Override
     public void open(DeserializationSchema.InitializationContext context) throws Exception {
-        keyDeserializer.open(context);
+        if (keyDeserializer != null) {
+            keyDeserializer.open(context);
+        }
         valueDeserializer.open(context);
+        projectCollector = new ProjectCollector();
     }
 
     @Override
@@ -85,17 +101,20 @@ public class KafkaLogKeyedDeserializationSchema implements KafkaDeserializationS
     }
 
     @Override
-    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> out)
+    public void deserialize(
+            ConsumerRecord<byte[], byte[]> record, Collector<RowData> underCollector)
             throws Exception {
-        if (record.value() == null) {
+        Collector<RowData> collector = projectCollector.project(underCollector);
+
+        if (primaryKey.length > 0 && record.value() == null) {
             RowData key = keyDeserializer.deserialize(record.key());
             GenericRowData value = new GenericRowData(RowKind.DELETE, fieldCount);
             for (int i = 0; i < primaryKey.length; i++) {
                 value.setField(primaryKey[i], keyFieldGetters[i].getFieldOrNull(key));
             }
-            out.collect(value);
+            collector.collect(value);
         } else {
-            valueDeserializer.deserialize(record.value(), out);
+            valueDeserializer.deserialize(record.value(), collector);
         }
     }
 
@@ -103,4 +122,29 @@ public class KafkaLogKeyedDeserializationSchema implements KafkaDeserializationS
     public TypeInformation<RowData> getProducedType() {
         return producedType;
     }
+
+    private class ProjectCollector implements Collector<RowData> {
+
+        private final ProjectedRowData projectedRow =
+                projectFields == null ? null : ProjectedRowData.from(projectFields);
+
+        private Collector<RowData> underCollector;
+
+        private Collector<RowData> project(Collector<RowData> underCollector) {
+            if (projectedRow == null) {
+                return underCollector;
+            }
+
+            this.underCollector = underCollector;
+            return this;
+        }
+
+        @Override
+        public void collect(RowData rowData) {
+            underCollector.collect(projectedRow.replaceRow(rowData));
+        }
+
+        @Override
+        public void close() {}
+    }
 }
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
index 68dbe63..d504802 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -56,6 +56,8 @@ public class KafkaLogSourceProvider implements LogSourceProvider {
 
     private final DeserializationSchema<RowData> valueDeserializer;
 
+    @Nullable private final int[][] projectFields;
+
     private final LogConsistency consistency;
 
     private final LogStartupMode scanMode;
@@ -69,6 +71,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider {
             int[] primaryKey,
             @Nullable DeserializationSchema<RowData> keyDeserializer,
             DeserializationSchema<RowData> valueDeserializer,
+            @Nullable int[][] projectFields,
             LogConsistency consistency,
             LogStartupMode scanMode,
             @Nullable Long timestampMills) {
@@ -78,6 +81,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider {
         this.primaryKey = primaryKey;
         this.keyDeserializer = keyDeserializer;
         this.valueDeserializer = valueDeserializer;
+        this.projectFields = projectFields;
         this.consistency = consistency;
         this.scanMode = scanMode;
         this.timestampMills = timestampMills;
@@ -109,11 +113,13 @@ public class KafkaLogSourceProvider implements LogSourceProvider {
 
     @VisibleForTesting
     KafkaRecordDeserializationSchema<RowData> createDeserializationSchema() {
-        return primaryKey.length > 0
-                ? KafkaRecordDeserializationSchema.of(
-                        new KafkaLogKeyedDeserializationSchema(
-                                physicalType, primaryKey, keyDeserializer, valueDeserializer))
-                : KafkaRecordDeserializationSchema.valueOnly(valueDeserializer);
+        return KafkaRecordDeserializationSchema.of(
+                new KafkaLogDeserializationSchema(
+                        physicalType,
+                        primaryKey,
+                        keyDeserializer,
+                        valueDeserializer,
+                        projectFields));
     }
 
     private OffsetsInitializer toOffsetsInitializer(@Nullable Map<Integer, Long> bucketOffsets) {
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index 943d0b6..58386fb 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -38,6 +38,8 @@ import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -159,7 +161,9 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
 
     @Override
     public KafkaLogSourceProvider createSourceProvider(
-            DynamicTableFactory.Context context, SourceContext sourceContext) {
+            DynamicTableFactory.Context context,
+            SourceContext sourceContext,
+            @Nullable int[][] projectFields) {
         FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
         ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
         DataType physicalType = schema.toPhysicalRowDataType();
@@ -181,6 +185,7 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
                 primaryKey,
                 keyDeserializer,
                 valueDeserializer,
+                projectFields,
                 helper.getOptions().get(CONSISTENCY),
                 helper.getOptions().get(SCAN),
                 helper.getOptions().get(SCAN_TIMESTAMP_MILLS));
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
index bb18a11..ab05fa7 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.kafka;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory.Context;
 import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
@@ -138,8 +139,6 @@ public class KafkaLogITCase extends KafkaTableTestBase {
                 testContext(name, getBootstrapServers(), changelogMode, consistency, keyed);
 
         KafkaLogSinkProvider sinkProvider = factory.createSinkProvider(context, SINK_CONTEXT);
-        KafkaLogSourceProvider sourceProvider =
-                factory.createSourceProvider(context, SOURCE_CONTEXT);
 
         factory.onCreateTable(context, 3, true);
         try {
@@ -158,12 +157,10 @@ public class KafkaLogITCase extends KafkaTableTestBase {
 
             // 1.2 read
             List<RowData> records =
-                    env.fromSource(
-                                    sourceProvider.createSource(null),
-                                    WatermarkStrategy.noWatermarks(),
-                                    "source")
-                            .executeAndCollect(4);
-            records.sort(Comparator.comparingInt(o -> o.getInt(0)));
+                    collect(
+                            factory.createSourceProvider(context, SOURCE_CONTEXT, null)
+                                    .createSource(null),
+                            4);
 
             // delete, upsert mode
             if (changelogMode == LogChangelogMode.UPSERT) {
@@ -171,12 +168,30 @@ public class KafkaLogITCase extends KafkaTableTestBase {
             } else {
                 assertRow(records.get(0), RowKind.DELETE, 1, 2);
             }
-
             // inserts
             assertRow(records.get(1), RowKind.INSERT, 3, 4);
             assertRow(records.get(2), RowKind.INSERT, 5, 6);
             assertRow(records.get(3), RowKind.INSERT, 7, 8);
 
+            // 1.3 read with projection
+            records =
+                    collect(
+                            factory.createSourceProvider(
+                                            context, SOURCE_CONTEXT, new int[][] {new int[] {1}})
+                                    .createSource(null),
+                            4);
+
+            // delete, upsert mode
+            if (changelogMode == LogChangelogMode.UPSERT) {
+                assertValue(records.get(0), RowKind.DELETE, null);
+            } else {
+                assertValue(records.get(0), RowKind.DELETE, 2);
+            }
+            // inserts
+            assertValue(records.get(1), RowKind.INSERT, 4);
+            assertValue(records.get(2), RowKind.INSERT, 6);
+            assertValue(records.get(3), RowKind.INSERT, 8);
+
             // 2.1 sink
             env.fromElements(
                             testRecord(true, 0, 9, 10, RowKind.INSERT),
@@ -187,13 +202,10 @@ public class KafkaLogITCase extends KafkaTableTestBase {
 
             // 2.2 read from offsets
             records =
-                    env.fromSource(
-                                    sourceProvider.createSource(
-                                            TestOffsetsLogSink.drainOffsets(uuid)),
-                                    WatermarkStrategy.noWatermarks(),
-                                    "source")
-                            .executeAndCollect(3);
-            records.sort(Comparator.comparingInt(o -> o.getInt(0)));
+                    collect(
+                            factory.createSourceProvider(context, SOURCE_CONTEXT, null)
+                                    .createSource(TestOffsetsLogSink.drainOffsets(uuid)),
+                            3);
             assertRow(records.get(0), RowKind.INSERT, 9, 10);
             assertRow(records.get(1), RowKind.INSERT, 11, 12);
             assertRow(records.get(2), RowKind.INSERT, 13, 14);
@@ -202,6 +214,14 @@ public class KafkaLogITCase extends KafkaTableTestBase {
         }
     }
 
+    private List<RowData> collect(KafkaSource<RowData> source, int numRecord) throws Exception {
+        List<RowData> records =
+                env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+                        .executeAndCollect(numRecord);
+        records.sort(Comparator.comparingInt(o -> o.getInt(0)));
+        return records;
+    }
+
     private void enableCheckpoint() {
         Configuration configuration = new Configuration();
         configuration.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
@@ -214,4 +234,9 @@ public class KafkaLogITCase extends KafkaTableTestBase {
         Assert.assertEquals(k, row.isNullAt(0) ? null : row.getInt(0));
         Assert.assertEquals(v, row.isNullAt(1) ? null : row.getInt(1));
     }
+
+    private void assertValue(RowData row, RowKind rowKind, Integer v) {
+        Assert.assertEquals(rowKind, row.getRowKind());
+        Assert.assertEquals(v, row.isNullAt(0) ? null : row.getInt(0));
+    }
 }
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
index c8461a0..1a48f77 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
@@ -38,7 +38,7 @@ import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.testContext;
 import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.testRecord;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link KafkaLogSerializationSchema} and {@link KafkaLogKeyedDeserializationSchema}. */
+/** Test for {@link KafkaLogSerializationSchema} and {@link KafkaLogDeserializationSchema}. */
 public class KafkaLogSerializationTest {
 
     private static final String TOPIC = "my_topic";
@@ -140,7 +140,7 @@ public class KafkaLogSerializationTest {
     private static KafkaRecordDeserializationSchema<RowData> createTestDeserializationSchema(
             DynamicTableFactory.Context context) {
         return discoverKafkaLogFactory()
-                .createSourceProvider(context, KafkaLogTestUtils.SOURCE_CONTEXT)
+                .createSourceProvider(context, KafkaLogTestUtils.SOURCE_CONTEXT, null)
                 .createDeserializationSchema();
     }
 }