You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/17 15:51:45 UTC

[flink] branch master updated: [FLINK-24776][table] Clarify DecodingFormat and introduce ProjectableDecodingFormat

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e5111c9  [FLINK-24776][table] Clarify DecodingFormat and introduce ProjectableDecodingFormat
e5111c9 is described below

commit e5111c970877b5772f0326ffbc998e0f6a8d351f
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu Nov 11 10:53:43 2021 +0100

    [FLINK-24776][table] Clarify DecodingFormat and introduce ProjectableDecodingFormat
    
    Clearly separates projectable formats from non-projectable formats. Before this,
    the semantics were not 100% clear which led to inconsistent connector and format
    implementations.
    
    The FileSystemTableSource has been updated to distinguish between
    those two interfaces now. Users that implemented custom formats for
    FileSystemTableSource might need to verify the implementation.
    
    For convienience, we introduces helper classes such as ProjectedRowData
    and Projection to ease the implementation.
    
    This closes #17768.
---
 .../util/RecordMapperWrapperRecordIterator.java    |  33 +-
 .../confluent/RegistryAvroFormatFactory.java       |   8 +-
 .../debezium/DebeziumAvroFormatFactory.java        |   8 +-
 .../flink/formats/avro/AvroFormatFactory.java      |   9 +-
 .../flink/formats/json/JsonFormatFactory.java      |   9 +-
 .../json/canal/CanalJsonDecodingFormat.java        |   7 +-
 .../json/debezium/DebeziumJsonDecodingFormat.java  |   7 +-
 .../json/maxwell/MaxwellJsonDecodingFormat.java    |   8 +-
 .../org/apache/flink/orc/OrcFileFormatFactory.java | 124 +++----
 .../formats/parquet/ParquetFileFormatFactory.java  |  64 ++--
 .../apache/flink/table/connector/Projection.java   | 389 +++++++++++++++++++++
 .../table/connector/format/DecodingFormat.java     |  56 +++
 .../format/ProjectableDecodingFormat.java          |  71 ++++
 .../abilities/SupportsProjectionPushDown.java      |   5 +
 .../flink/table/data/utils/ProjectedRowData.java   | 235 +++++++++++++
 .../flink/table/connector/ProjectionTest.java      | 166 +++++++++
 .../table/data/utils/ProjectedRowDataTest.java     |  66 ++++
 .../filesystem/FileInfoExtractorBulkFormat.java    |  40 +--
 .../table/filesystem/FileSystemTableSource.java    | 106 ++++--
 .../table/filesystem/ProjectingBulkFormat.java     |  84 +++++
 .../filesystem/TestCsvFileSystemFormatFactory.java |  27 +-
 21 files changed, 1324 insertions(+), 198 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java
index 76c4fb0..6d916bf 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java
@@ -20,9 +20,14 @@ package org.apache.flink.connector.file.src.util;
 
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
 /**
  * Implementation of {@link org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator}
- * that wraps another iterator and performs the mapping of the records.
+ * that wraps another iterator and performs the mapping of the records. You can use {@link
+ * #wrapReader(BulkFormat.Reader, RecordMapper)} to wrap a whole reader.
  *
  * @param <I> Input type
  * @param <O> Mapped output type
@@ -61,4 +66,30 @@ public class RecordMapperWrapperRecordIterator<I, O> implements BulkFormat.Recor
     public void releaseBatch() {
         this.wrapped.releaseBatch();
     }
+
+    /**
+     * Wrap a {@link BulkFormat.Reader} applying a {@link RecordMapper} on the returned iterator.
+     *
+     * @param <I> Input type
+     * @param <O> Mapped output type
+     */
+    public static <I, O> BulkFormat.Reader<O> wrapReader(
+            BulkFormat.Reader<I> wrappedReader, RecordMapper<I, O> recordMapper) {
+        return new BulkFormat.Reader<O>() {
+            @Nullable
+            @Override
+            public BulkFormat.RecordIterator<O> readBatch() throws IOException {
+                BulkFormat.RecordIterator<I> iterator = wrappedReader.readBatch();
+                if (iterator == null) {
+                    return null;
+                }
+                return new RecordMapperWrapperRecordIterator<>(iterator, recordMapper);
+            }
+
+            @Override
+            public void close() throws IOException {
+                wrappedReader.close();
+            }
+        };
+    }
 }
diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
index c1d0ff81..d4771fe 100644
--- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
+++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -81,10 +82,13 @@ public class RegistryAvroFormatFactory
         String schemaRegistryURL = formatOptions.get(URL);
         Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions);
 
-        return new DecodingFormat<DeserializationSchema<RowData>>() {
+        return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {
             @Override
             public DeserializationSchema<RowData> createRuntimeDecoder(
-                    DynamicTableSource.Context context, DataType producedDataType) {
+                    DynamicTableSource.Context context,
+                    DataType producedDataType,
+                    int[][] projections) {
+                producedDataType = DataType.projectFields(producedDataType, projections);
                 final RowType rowType = (RowType) producedDataType.getLogicalType();
                 final TypeInformation<RowData> rowDataTypeInfo =
                         context.createTypeInformation(producedDataType);
diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java
index eaf069a..d889dee 100644
--- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java
+++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -75,10 +76,13 @@ public class DebeziumAvroFormatFactory
         String schemaRegistryURL = formatOptions.get(URL);
         Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions);
 
-        return new DecodingFormat<DeserializationSchema<RowData>>() {
+        return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {
             @Override
             public DeserializationSchema<RowData> createRuntimeDecoder(
-                    DynamicTableSource.Context context, DataType producedDataType) {
+                    DynamicTableSource.Context context,
+                    DataType producedDataType,
+                    int[][] projections) {
+                producedDataType = DataType.projectFields(producedDataType, projections);
                 final RowType rowType = (RowType) producedDataType.getLogicalType();
                 final TypeInformation<RowData> producedTypeInfo =
                         context.createTypeInformation(producedDataType);
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
index 7faa3e1..3860877 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -54,10 +55,14 @@ public class AvroFormatFactory implements DeserializationFormatFactory, Serializ
             DynamicTableFactory.Context context, ReadableConfig formatOptions) {
         FactoryUtil.validateFactoryOptions(this, formatOptions);
 
-        return new DecodingFormat<DeserializationSchema<RowData>>() {
+        return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {
             @Override
             public DeserializationSchema<RowData> createRuntimeDecoder(
-                    DynamicTableSource.Context context, DataType producedDataType) {
+                    DynamicTableSource.Context context,
+                    DataType physicalDataType,
+                    int[][] projections) {
+                final DataType producedDataType =
+                        DataType.projectFields(physicalDataType, projections);
                 final RowType rowType = (RowType) producedDataType.getLogicalType();
                 final TypeInformation<RowData> rowDataTypeInfo =
                         context.createTypeInformation(producedDataType);
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index 692973f..b78c024 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -68,10 +69,14 @@ public class JsonFormatFactory implements DeserializationFormatFactory, Serializ
         final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
         TimestampFormat timestampOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
 
-        return new DecodingFormat<DeserializationSchema<RowData>>() {
+        return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {
             @Override
             public DeserializationSchema<RowData> createRuntimeDecoder(
-                    DynamicTableSource.Context context, DataType producedDataType) {
+                    DynamicTableSource.Context context,
+                    DataType physicalDataType,
+                    int[][] projections) {
+                final DataType producedDataType =
+                        DataType.projectFields(physicalDataType, projections);
                 final RowType rowType = (RowType) producedDataType.getLogicalType();
                 final TypeInformation<RowData> rowDataTypeInfo =
                         context.createTypeInformation(producedDataType);
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
index f958c8d..76f4e2e 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
@@ -25,6 +25,7 @@ import org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.Metada
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -43,7 +44,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /** {@link DecodingFormat} for Canal using JSON encoding. */
-public class CanalJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
+public class CanalJsonDecodingFormat
+        implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {
 
     // --------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -77,7 +79,8 @@ public class CanalJsonDecodingFormat implements DecodingFormat<DeserializationSc
 
     @Override
     public DeserializationSchema<RowData> createRuntimeDecoder(
-            DynamicTableSource.Context context, DataType physicalDataType) {
+            DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) {
+        physicalDataType = DataType.projectFields(physicalDataType, projections);
         final List<ReadableMetadata> readableMetadata =
                 metadataKeys.stream()
                         .map(
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
index 5db2f64..472596a 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
@@ -25,6 +25,7 @@ import org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
@@ -43,7 +44,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /** {@link DecodingFormat} for Debezium using JSON encoding. */
-public class DebeziumJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
+public class DebeziumJsonDecodingFormat
+        implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {
 
     // --------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -71,7 +73,8 @@ public class DebeziumJsonDecodingFormat implements DecodingFormat<Deserializatio
 
     @Override
     public DeserializationSchema<RowData> createRuntimeDecoder(
-            DynamicTableSource.Context context, DataType physicalDataType) {
+            DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) {
+        physicalDataType = DataType.projectFields(physicalDataType, projections);
 
         final List<ReadableMetadata> readableMetadata =
                 metadataKeys.stream()
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java
index d040386..8aae68f 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDecodingFormat.java
@@ -25,6 +25,7 @@ import org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.Me
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -41,7 +42,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /** {@link DecodingFormat} for Maxwell using JSON encoding. */
-public class MaxwellJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
+public class MaxwellJsonDecodingFormat
+        implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {
 
     // --------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -62,7 +64,9 @@ public class MaxwellJsonDecodingFormat implements DecodingFormat<Deserialization
 
     @Override
     public DeserializationSchema<RowData> createRuntimeDecoder(
-            DynamicTableSource.Context context, DataType physicalDataType) {
+            DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) {
+        physicalDataType = DataType.projectFields(physicalDataType, projections);
+
         final List<ReadableMetadata> readableMetadata =
                 metadataKeys.stream()
                         .map(
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
index 7343959..d6d4e95 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
@@ -27,8 +27,10 @@ import org.apache.flink.orc.shim.OrcShim;
 import org.apache.flink.orc.vector.RowDataVectorizer;
 import org.apache.flink.orc.writer.OrcBulkWriterFactory;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.BulkDecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -38,8 +40,6 @@ import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
-import org.apache.flink.table.filesystem.PartitionFieldExtractor;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.orc.TypeDescription;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
@@ -85,71 +86,7 @@ public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
     @Override
     public BulkDecodingFormat<RowData> createDecodingFormat(
             DynamicTableFactory.Context context, ReadableConfig formatOptions) {
-        return new BulkDecodingFormat<RowData>() {
-
-            private List<ResolvedExpression> filters;
-
-            @Override
-            public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
-                    DynamicTableSource.Context sourceContext, DataType producedDataType) {
-                List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
-
-                if (filters != null) {
-                    for (Expression pred : filters) {
-                        OrcFilters.Predicate orcPred = OrcFilters.toOrcPredicate(pred);
-                        if (orcPred != null) {
-                            orcPredicates.add(orcPred);
-                        }
-                    }
-                }
-
-                RowType tableType =
-                        (RowType)
-                                context.getCatalogTable()
-                                        .getSchema()
-                                        .toPhysicalRowDataType()
-                                        .getLogicalType();
-                List<String> tableFieldNames = tableType.getFieldNames();
-                RowType projectedType = (RowType) producedDataType.getLogicalType();
-
-                int[] selectedFields =
-                        projectedType.getFieldNames().stream()
-                                .mapToInt(tableFieldNames::indexOf)
-                                .toArray();
-
-                Properties properties = getOrcProperties(formatOptions);
-                Configuration conf = new Configuration();
-                properties.forEach((k, v) -> conf.set(k.toString(), v.toString()));
-
-                String defaultPartName =
-                        context.getCatalogTable()
-                                .getOptions()
-                                .getOrDefault(
-                                        FileSystemConnectorOptions.PARTITION_DEFAULT_NAME.key(),
-                                        FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
-                                                .defaultValue());
-
-                return OrcColumnarRowFileInputFormat.createPartitionedFormat(
-                        OrcShim.defaultShim(),
-                        conf,
-                        tableType,
-                        context.getCatalogTable().getPartitionKeys(),
-                        PartitionFieldExtractor.forFileSystem(defaultPartName),
-                        selectedFields,
-                        orcPredicates,
-                        VectorizedColumnBatch.DEFAULT_SIZE);
-            }
-
-            @Override
-            public ChangelogMode getChangelogMode() {
-                return ChangelogMode.insertOnly();
-            }
-
-            @Override
-            public void applyFilters(List<ResolvedExpression> filters) {
-                this.filters = filters;
-            }
-        };
+        return new OrcBulkDecodingFormat(formatOptions);
     }
 
     @Override
@@ -177,4 +114,57 @@ public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
             }
         };
     }
+
+    private static class OrcBulkDecodingFormat
+            implements BulkDecodingFormat<RowData>,
+                    ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>> {
+
+        private final ReadableConfig formatOptions;
+        private List<ResolvedExpression> filters;
+
+        public OrcBulkDecodingFormat(ReadableConfig formatOptions) {
+            this.formatOptions = formatOptions;
+        }
+
+        @Override
+        public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
+                DynamicTableSource.Context sourceContext,
+                DataType producedDataType,
+                int[][] projections) {
+            List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
+
+            if (filters != null) {
+                for (Expression pred : filters) {
+                    OrcFilters.Predicate orcPred = OrcFilters.toOrcPredicate(pred);
+                    if (orcPred != null) {
+                        orcPredicates.add(orcPred);
+                    }
+                }
+            }
+
+            Properties properties = getOrcProperties(formatOptions);
+            Configuration conf = new Configuration();
+            properties.forEach((k, v) -> conf.set(k.toString(), v.toString()));
+
+            return OrcColumnarRowFileInputFormat.createPartitionedFormat(
+                    OrcShim.defaultShim(),
+                    conf,
+                    (RowType) producedDataType.getLogicalType(),
+                    Collections.emptyList(),
+                    null,
+                    Projection.of(projections).toTopLevelIndexes(),
+                    orcPredicates,
+                    VectorizedColumnBatch.DEFAULT_SIZE);
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return ChangelogMode.insertOnly();
+        }
+
+        @Override
+        public void applyFilters(List<ResolvedExpression> filters) {
+            this.filters = filters;
+        }
+    }
 }
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
index bf28661..f8e99f2c 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.BulkDecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -34,13 +35,12 @@ import org.apache.flink.table.data.vector.VectorizedColumnBatch;
 import org.apache.flink.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
-import org.apache.flink.table.filesystem.PartitionFieldExtractor;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.hadoop.conf.Configuration;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
@@ -64,32 +64,7 @@ public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWr
     @Override
     public BulkDecodingFormat<RowData> createDecodingFormat(
             DynamicTableFactory.Context context, ReadableConfig formatOptions) {
-        return new BulkDecodingFormat<RowData>() {
-            @Override
-            public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
-                    DynamicTableSource.Context sourceContext, DataType producedDataType) {
-                String defaultPartName =
-                        context.getCatalogTable()
-                                .getOptions()
-                                .getOrDefault(
-                                        FileSystemConnectorOptions.PARTITION_DEFAULT_NAME.key(),
-                                        FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
-                                                .defaultValue());
-                return ParquetColumnarRowInputFormat.createPartitionedFormat(
-                        getParquetConfiguration(formatOptions),
-                        (RowType) producedDataType.getLogicalType(),
-                        context.getCatalogTable().getPartitionKeys(),
-                        PartitionFieldExtractor.forFileSystem(defaultPartName),
-                        VectorizedColumnBatch.DEFAULT_SIZE,
-                        formatOptions.get(UTC_TIMEZONE),
-                        true);
-            }
-
-            @Override
-            public ChangelogMode getChangelogMode() {
-                return ChangelogMode.insertOnly();
-            }
-        };
+        return new ParquetBulkDecodingFormat(formatOptions);
     }
 
     @Override
@@ -134,4 +109,37 @@ public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWr
     public Set<ConfigOption<?>> optionalOptions() {
         return new HashSet<>();
     }
+
+    private static class ParquetBulkDecodingFormat
+            implements ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,
+                    BulkDecodingFormat<RowData> {
+
+        private final ReadableConfig formatOptions;
+
+        public ParquetBulkDecodingFormat(ReadableConfig formatOptions) {
+            this.formatOptions = formatOptions;
+        }
+
+        @Override
+        public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
+                DynamicTableSource.Context sourceContext,
+                DataType producedDataType,
+                int[][] projections) {
+
+            return ParquetColumnarRowInputFormat.createPartitionedFormat(
+                    getParquetConfiguration(formatOptions),
+                    (RowType)
+                            DataType.projectFields(producedDataType, projections).getLogicalType(),
+                    Collections.emptyList(),
+                    null,
+                    VectorizedColumnBatch.DEFAULT_SIZE,
+                    formatOptions.get(UTC_TIMEZONE),
+                    true);
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return ChangelogMode.insertOnly();
+        }
+    }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/Projection.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/Projection.java
new file mode 100644
index 0000000..e2758aa
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/Projection.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * {@link Projection} represents a list of (possibly nested) indexes that can be used to project
+ * data types. A row projection includes both reducing the accessible fields and reordering them.
+ */
+@PublicEvolving
+public abstract class Projection {
+
+    // sealed class
+    private Projection() {}
+
+    /** Project the provided {@link DataType} using this {@link Projection}. */
+    public abstract DataType project(DataType dataType);
+
+    /** @return {@code true} whether this projection is nested or not. */
+    public abstract boolean isNested();
+
+    /**
+     * Perform a difference of this {@link Projection} with another {@link Projection}. The result
+     * of this operation is a new {@link Projection} retaining the same ordering of this instance
+     * but with the indexes from {@code other} removed. For example:
+     *
+     * <pre>
+     * <code>
+     * [4, 1, 0, 3, 2] - [4, 2] = [1, 0, 2]
+     * </code>
+     * </pre>
+     *
+     * <p>Note how the index {@code 3} in the minuend becomes {@code 2} because it's rescaled to
+     * project correctly a {@link RowData} or arity 3.
+     *
+     * @param other the subtrahend
+     * @throws IllegalArgumentException when {@code other} is nested.
+     */
+    public abstract Projection difference(Projection other);
+
+    /**
+     * Complement this projection. The returned projection is an ordered projection of fields from 0
+     * to {@code fieldsNumber} except the indexes in this {@link Projection}. For example:
+     *
+     * <pre>
+     * <code>
+     * [4, 2].complement(5) = [0, 1, 3]
+     * </code>
+     * </pre>
+     *
+     * @param fieldsNumber the size of the universe
+     * @throws IllegalStateException if this projection is nested.
+     */
+    public abstract Projection complement(int fieldsNumber);
+
+    /** Like {@link #complement(int)}, using the {@code dataType} fields count. */
+    public Projection complement(DataType dataType) {
+        return complement(DataType.getFieldCount(dataType));
+    }
+
+    /**
+     * Convert this instance to a projection of top level indexes. The array represents the mapping
+     * of the fields of the original {@link DataType}. For example, {@code [0, 2, 1]} specifies to
+     * include in the following order the 1st field, the 3rd field and the 2nd field of the row.
+     *
+     * @throws IllegalStateException if this projection is nested.
+     */
+    public abstract int[] toTopLevelIndexes();
+
+    /**
+     * Convert this instance to a nested projection index paths. The array represents the mapping of
+     * the fields of the original {@link DataType}, including nested rows. For example, {@code [[0,
+     * 2, 1], ...]} specifies to include the 2nd field of the 3rd field of the 1st field in the
+     * top-level row.
+     */
+    public abstract int[][] toNestedIndexes();
+
+    /**
+     * Create an empty {@link Projection}, that is a projection that projects no fields, returning
+     * an empty {@link DataType}.
+     */
+    public static Projection empty() {
+        return EmptyProjection.INSTANCE;
+    }
+
+    /**
+     * Create a {@link Projection} of the provided {@code indexes}.
+     *
+     * @see #toTopLevelIndexes()
+     */
+    public static Projection of(int[] indexes) {
+        if (indexes.length == 0) {
+            return empty();
+        }
+        return new TopLevelProjection(indexes);
+    }
+
+    /**
+     * Create a {@link Projection} of the provided {@code indexes}.
+     *
+     * @see #toNestedIndexes()
+     */
+    public static Projection of(int[][] indexes) {
+        if (indexes.length == 0) {
+            return empty();
+        }
+        return new NestedProjection(indexes);
+    }
+
+    /**
+     * Create a {@link Projection} of the provided {@code dataType} using the provided {@code
+     * projectedFields}.
+     */
+    public static Projection fromFieldNames(DataType dataType, List<String> projectedFields) {
+        List<String> dataTypeFieldNames = DataType.getFieldNames(dataType);
+        return new TopLevelProjection(
+                projectedFields.stream().mapToInt(dataTypeFieldNames::indexOf).toArray());
+    }
+
+    /** Create a {@link Projection} of all the fields in the provided {@code dataType}. */
+    public static Projection all(DataType dataType) {
+        return new TopLevelProjection(
+                IntStream.range(0, DataType.getFieldCount(dataType)).toArray());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof Projection)) {
+            return false;
+        }
+        Projection other = (Projection) o;
+        if (!this.isNested() && !other.isNested()) {
+            return Arrays.equals(this.toTopLevelIndexes(), other.toTopLevelIndexes());
+        }
+        return Arrays.deepEquals(this.toNestedIndexes(), other.toNestedIndexes());
+    }
+
+    @Override
+    public int hashCode() {
+        if (isNested()) {
+            return Arrays.deepHashCode(toNestedIndexes());
+        }
+        return Arrays.hashCode(toTopLevelIndexes());
+    }
+
+    @Override
+    public String toString() {
+        if (isNested()) {
+            return "Nested projection = " + Arrays.deepToString(toNestedIndexes());
+        }
+        return "Top level projection = " + Arrays.toString(toTopLevelIndexes());
+    }
+
+    private static class EmptyProjection extends Projection {
+
+        static final EmptyProjection INSTANCE = new EmptyProjection();
+
+        private EmptyProjection() {}
+
+        @Override
+        public DataType project(DataType dataType) {
+            return DataType.projectFields(dataType, toTopLevelIndexes());
+        }
+
+        @Override
+        public boolean isNested() {
+            return false;
+        }
+
+        @Override
+        public Projection difference(Projection projection) {
+            return this;
+        }
+
+        @Override
+        public Projection complement(int fieldsNumber) {
+            return new TopLevelProjection(IntStream.range(0, fieldsNumber).toArray());
+        }
+
+        @Override
+        public int[] toTopLevelIndexes() {
+            return new int[0];
+        }
+
+        @Override
+        public int[][] toNestedIndexes() {
+            return new int[0][];
+        }
+    }
+
+    private static class NestedProjection extends Projection {
+
+        final int[][] projection;
+        final boolean nested;
+
+        NestedProjection(int[][] projection) {
+            this.projection = projection;
+            this.nested = Arrays.stream(projection).anyMatch(arr -> arr.length > 1);
+        }
+
+        @Override
+        public DataType project(DataType dataType) {
+            return DataType.projectFields(dataType, projection);
+        }
+
+        @Override
+        public boolean isNested() {
+            return nested;
+        }
+
+        @Override
+        public Projection difference(Projection other) {
+            if (other.isNested()) {
+                throw new IllegalArgumentException(
+                        "Cannot perform difference between nested projection and nested projection");
+            }
+            if (other instanceof EmptyProjection) {
+                return this;
+            }
+            if (!this.isNested()) {
+                return new TopLevelProjection(toTopLevelIndexes()).difference(other);
+            }
+
+            // Extract the indexes to exclude and sort them
+            int[] indexesToExclude = other.toTopLevelIndexes();
+            indexesToExclude = Arrays.copyOf(indexesToExclude, indexesToExclude.length);
+            Arrays.sort(indexesToExclude);
+
+            List<int[]> resultProjection =
+                    Arrays.stream(projection).collect(Collectors.toCollection(ArrayList::new));
+
+            ListIterator<int[]> resultProjectionIterator = resultProjection.listIterator();
+            while (resultProjectionIterator.hasNext()) {
+                int[] indexArr = resultProjectionIterator.next();
+
+                // Let's check if the index is inside the indexesToExclude array
+                int searchResult = Arrays.binarySearch(indexesToExclude, indexArr[0]);
+                if (searchResult >= 0) {
+                    // Found, we need to remove it
+                    resultProjectionIterator.remove();
+                } else {
+                    // Not found, let's compute the offset.
+                    // Offset is the index where the projection index should be inserted in the
+                    // indexesToExclude array
+                    int offset = (-(searchResult) - 1);
+                    if (offset != 0) {
+                        indexArr[0] = indexArr[0] - offset;
+                    }
+                }
+            }
+
+            return new NestedProjection(resultProjection.toArray(new int[0][]));
+        }
+
+        @Override
+        public Projection complement(int fieldsNumber) {
+            if (isNested()) {
+                throw new IllegalStateException("Cannot perform complement of a nested projection");
+            }
+            return new TopLevelProjection(toTopLevelIndexes()).complement(fieldsNumber);
+        }
+
+        @Override
+        public int[] toTopLevelIndexes() {
+            if (isNested()) {
+                throw new IllegalStateException(
+                        "Cannot convert a nested projection to a top level projection");
+            }
+            return Arrays.stream(projection).mapToInt(arr -> arr[0]).toArray();
+        }
+
+        @Override
+        public int[][] toNestedIndexes() {
+            return projection;
+        }
+    }
+
+    private static class TopLevelProjection extends Projection {
+
+        final int[] projection;
+
+        TopLevelProjection(int[] projection) {
+            this.projection = projection;
+        }
+
+        @Override
+        public DataType project(DataType dataType) {
+            return DataType.projectFields(dataType, this.projection);
+        }
+
+        @Override
+        public boolean isNested() {
+            return false;
+        }
+
+        @Override
+        public Projection difference(Projection other) {
+            if (other.isNested()) {
+                throw new IllegalArgumentException(
+                        "Cannot perform difference between top level projection and nested projection");
+            }
+            if (other instanceof EmptyProjection) {
+                return this;
+            }
+
+            // Extract the indexes to exclude and sort them
+            int[] indexesToExclude = other.toTopLevelIndexes();
+            indexesToExclude = Arrays.copyOf(indexesToExclude, indexesToExclude.length);
+            Arrays.sort(indexesToExclude);
+
+            List<Integer> resultProjection =
+                    Arrays.stream(projection)
+                            .boxed()
+                            .collect(Collectors.toCollection(ArrayList::new));
+
+            ListIterator<Integer> resultProjectionIterator = resultProjection.listIterator();
+            while (resultProjectionIterator.hasNext()) {
+                int index = resultProjectionIterator.next();
+
+                // Let's check if the index is inside the indexesToExclude array
+                int searchResult = Arrays.binarySearch(indexesToExclude, index);
+                if (searchResult >= 0) {
+                    // Found, we need to remove it
+                    resultProjectionIterator.remove();
+                } else {
+                    // Not found, let's compute the offset.
+                    // Offset is the index where the projection index should be inserted in the
+                    // indexesToExclude array
+                    int offset = (-(searchResult) - 1);
+                    if (offset != 0) {
+                        resultProjectionIterator.set(index - offset);
+                    }
+                }
+            }
+
+            return new TopLevelProjection(resultProjection.stream().mapToInt(i -> i).toArray());
+        }
+
+        @Override
+        public Projection complement(int fieldsNumber) {
+            int[] indexesToExclude = Arrays.copyOf(projection, projection.length);
+            Arrays.sort(indexesToExclude);
+
+            return new TopLevelProjection(
+                    IntStream.range(0, fieldsNumber)
+                            .filter(i -> Arrays.binarySearch(indexesToExclude, i) < 0)
+                            .toArray());
+        }
+
+        @Override
+        public int[] toTopLevelIndexes() {
+            return projection;
+        }
+
+        @Override
+        public int[][] toNestedIndexes() {
+            return Arrays.stream(projection).mapToObj(i -> new int[] {i}).toArray(int[][]::new);
+        }
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java
index ffbe672..71061c6 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java
@@ -19,8 +19,13 @@
 package org.apache.flink.table.connector.format;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.types.DataType;
 
 import java.util.Collections;
@@ -30,6 +35,53 @@ import java.util.Map;
 /**
  * A {@link Format} for a {@link DynamicTableSource} for reading rows.
  *
+ * <h1>Implementing a {@link DecodingFormat}</h1>
+ *
+ * {@link DecodingFormat#createRuntimeDecoder(DynamicTableSource.Context, DataType)} takes a {@code
+ * physicalDataType}. This {@link DataType} has usually been derived from a table's {@link
+ * ResolvedSchema} and excludes partition, metadata, and other auxiliary columns. The {@code
+ * physicalDataType} should describe exactly the full serialized record. In other words: for every
+ * field in the serialized record there is a corresponding field at the same position in the {@code
+ * physicalDataType}. Some implementations may decide to be more lenient and allow users to omit
+ * fields but this depends on the format characteristics. For example, a CSV format implementation
+ * might allow the user to define the schema only for the first 5 of the 10 total columns available
+ * in each row.
+ *
+ * <p>If the format supports projections, that is it can exclude certain fields from being parsed
+ * <b>independently of the fields defined in the schema</b> and <b>can reorder fields</b> in the
+ * produced {@link RowData}, then it should implement {@link ProjectableDecodingFormat}. {@link
+ * ProjectableDecodingFormat#createRuntimeDecoder(DynamicTableSource.Context, DataType, int[][])}
+ * provides the {@code physicalDataType} as described above and provides {@code projections} to
+ * compute the type to produce using {@code DataType.projectFields(physicalDataType, projections)}.
+ * For example, a JSON format implementation may match the fields based on the JSON object keys,
+ * hence it can easily produce {@link RowData} excluding unused object values and set values inside
+ * the {@link RowData} using the index provided by the {@code projections} array.
+ *
+ * <p>Whenever possible, it's highly recommended implementing {@link ProjectableDecodingFormat}, as
+ * it might help to reduce the data volume when users are reading large records but are using only a
+ * small subset of fields.
+ *
+ * <h1>Using a {@link DecodingFormat}</h1>
+ *
+ * {@link DynamicTableSource} that doesn't implement {@link SupportsProjectionPushDown} should
+ * invoke {@link DecodingFormat#createRuntimeDecoder(DynamicTableSource.Context, DataType)}.
+ * Usually, {@link DynamicTableFactory.Context#getPhysicalRowDataType()} can provide the {@code
+ * physicalDataType} (stripped of any fields not available in the serialized record).
+ *
+ * <p>{@link DynamicTableSource} implementing {@link SupportsProjectionPushDown} should check
+ * whether the {@link DecodingFormat} is an instance of {@link ProjectableDecodingFormat}:
+ *
+ * <ul>
+ *   <li>If yes, then the connector can invoke {@link
+ *       ProjectableDecodingFormat#createRuntimeDecoder(DynamicTableSource.Context, DataType,
+ *       int[][])} providing a non null {@code projections} array excluding auxiliary fields. The
+ *       built runtime implementation will take care of projections, producing records of type
+ *       {@code DataType.projectFields(physicalDataType, projections)}.
+ *   <li>If no, then the connector must take care of performing the projection, for example using
+ *       {@link ProjectedRowData} to project physical {@link RowData} emitted from the decoder
+ *       runtime implementation.
+ * </ul>
+ *
  * @param <I> runtime interface needed by the table source
  */
 @PublicEvolving
@@ -38,6 +90,10 @@ public interface DecodingFormat<I> extends Format {
     /**
      * Creates runtime decoder implementation that is configured to produce data of the given data
      * type.
+     *
+     * @param context the context provides several utilities required to instantiate the runtime
+     *     decoder implementation of the format
+     * @param physicalDataType For more details check the documentation of {@link DecodingFormat}.
      */
     I createRuntimeDecoder(DynamicTableSource.Context context, DataType physicalDataType);
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ProjectableDecodingFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ProjectableDecodingFormat.java
new file mode 100644
index 0000000..467ebf1
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ProjectableDecodingFormat.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * Extension of {@link DecodingFormat} which is able to produce projected rows.
+ *
+ * <p>For more details on usage and differences between {@link DecodingFormat} and {@link
+ * ProjectableDecodingFormat}, check the documentation of {@link DecodingFormat}.
+ *
+ * @see Projection
+ * @see ProjectedRowData
+ */
+@PublicEvolving
+public interface ProjectableDecodingFormat<I> extends DecodingFormat<I> {
+
+    /** Returns whether this format supports nested projection. */
+    default boolean supportsNestedProjection() {
+        return false;
+    }
+
+    /**
+     * Creates runtime decoder implementation that is configured to produce data of type {@code
+     * DataType.projectFields(physicalDataType, projections)}. For more details on the usage, check
+     * {@link DecodingFormat} documentation.
+     *
+     * @param context the context provides several utilities required to instantiate the runtime
+     *     decoder implementation of the format
+     * @param physicalDataType For more details check {@link DecodingFormat}
+     * @param projections the projections array. The array represents the mapping of the fields of
+     *     the original {@link DataType}, including nested rows. For example, {@code [[0, 2, 1],
+     *     ...]} specifies to include the 2nd field of the 3rd field of the 1st field in the
+     *     top-level row. It's guaranteed that this array won't contain nested projections if {@link
+     *     #supportsNestedProjection()} returns {@code true}. For more details, check {@link
+     *     Projection} as well.
+     * @return the runtime decoder
+     * @see DecodingFormat
+     */
+    I createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType physicalDataType, int[][] projections);
+
+    default I createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType projectedPhysicalDataType) {
+        return createRuntimeDecoder(
+                context,
+                projectedPhysicalDataType,
+                Projection.all(projectedPhysicalDataType).toNestedIndexes());
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java
index c63cedb..5022eeb 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.connector.source.abilities;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.utils.ProjectedRowData;
 import org.apache.flink.table.types.DataType;
 
 /**
@@ -45,6 +47,9 @@ import org.apache.flink.table.types.DataType;
  * different field order). It does not contain any computation. A projection can either be performed
  * on the fields of the top-level row only or consider nested fields as well (see {@link
  * #supportsNestedProjection()}).
+ *
+ * @see Projection
+ * @see ProjectedRowData
  */
 @PublicEvolving
 public interface SupportsProjectionPushDown {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java
new file mode 100644
index 0000000..196796a
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+
+/**
+ * An implementation of {@link RowData} which provides a projected view of the underlying {@link
+ * RowData}.
+ *
+ * <p>Projection includes both reducing the accessible fields and reordering them.
+ *
+ * <p>Note: This class supports only top-level projections, not nested projections.
+ */
+@PublicEvolving
+public class ProjectedRowData implements RowData {
+
+    private final int[] indexMapping;
+
+    private RowData row;
+
+    private ProjectedRowData(int[] indexMapping) {
+        this.indexMapping = indexMapping;
+    }
+
+    /**
+     * Replaces the underlying {@link RowData} backing this {@link ProjectedRowData}.
+     *
+     * <p>This method replaces the row data in place and does not return a new object. This is done
+     * for performance reasons.
+     */
+    public ProjectedRowData replaceRow(RowData row) {
+        this.row = row;
+        return this;
+    }
+
+    // ---------------------------------------------------------------------------------------------
+
+    @Override
+    public int getArity() {
+        return indexMapping.length;
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return row.getRowKind();
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        row.setRowKind(kind);
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return row.isNullAt(indexMapping[pos]);
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return row.getBoolean(indexMapping[pos]);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return row.getByte(indexMapping[pos]);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return row.getShort(indexMapping[pos]);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return row.getInt(indexMapping[pos]);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return row.getLong(indexMapping[pos]);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return row.getFloat(indexMapping[pos]);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return row.getDouble(indexMapping[pos]);
+    }
+
+    @Override
+    public StringData getString(int pos) {
+        return row.getString(indexMapping[pos]);
+    }
+
+    @Override
+    public DecimalData getDecimal(int pos, int precision, int scale) {
+        return row.getDecimal(indexMapping[pos], precision, scale);
+    }
+
+    @Override
+    public TimestampData getTimestamp(int pos, int precision) {
+        return row.getTimestamp(indexMapping[pos], precision);
+    }
+
+    @Override
+    public <T> RawValueData<T> getRawValue(int pos) {
+        return row.getRawValue(indexMapping[pos]);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return row.getBinary(indexMapping[pos]);
+    }
+
+    @Override
+    public ArrayData getArray(int pos) {
+        return row.getArray(indexMapping[pos]);
+    }
+
+    @Override
+    public MapData getMap(int pos) {
+        return row.getMap(indexMapping[pos]);
+    }
+
+    @Override
+    public RowData getRow(int pos, int numFields) {
+        return row.getRow(indexMapping[pos], numFields);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        throw new UnsupportedOperationException("Projected row data cannot be compared");
+    }
+
+    @Override
+    public int hashCode() {
+        throw new UnsupportedOperationException("Projected row data cannot be hashed");
+    }
+
+    @Override
+    public String toString() {
+        return row.getRowKind().shortString()
+                + "{"
+                + "indexMapping="
+                + Arrays.toString(indexMapping)
+                + ", mutableRow="
+                + row
+                + '}';
+    }
+
+    /**
+     * Like {@link #from(int[])}, but throws {@link IllegalArgumentException} if the provided {@code
+     * projection} array contains nested projections, which are not supported by {@link
+     * ProjectedRowData}.
+     *
+     * <p>The array represents the mapping of the fields of the original {@link DataType}, including
+     * nested rows. For example, {@code [[0, 2, 1], ...]} specifies to include the 2nd field of the
+     * 3rd field of the 1st field in the top-level row.
+     *
+     * @see Projection
+     * @see ProjectedRowData
+     */
+    public static ProjectedRowData from(int[][] projection) throws IllegalArgumentException {
+        return new ProjectedRowData(
+                Arrays.stream(projection)
+                        .mapToInt(
+                                arr -> {
+                                    if (arr.length != 1) {
+                                        throw new IllegalArgumentException(
+                                                "ProjectedRowData doesn't support nested projections");
+                                    }
+                                    return arr[0];
+                                })
+                        .toArray());
+    }
+
+    /**
+     * Create an empty {@link ProjectedRowData} starting from a {@code projection} array.
+     *
+     * <p>The array represents the mapping of the fields of the original {@link DataType}. For
+     * example, {@code [0, 2, 1]} specifies to include in the following order the 1st field, the 3rd
+     * field and the 2nd field of the row.
+     *
+     * @see Projection
+     * @see ProjectedRowData
+     */
+    public static ProjectedRowData from(int[] projection) {
+        return new ProjectedRowData(projection);
+    }
+
+    /**
+     * Create an empty {@link ProjectedRowData} starting from a {@link Projection}.
+     *
+     * <p>Throws {@link IllegalStateException} if the provided {@code projection} array contains
+     * nested projections, which are not supported by {@link ProjectedRowData}.
+     *
+     * @see Projection
+     * @see ProjectedRowData
+     */
+    public static ProjectedRowData from(Projection projection) {
+        return new ProjectedRowData(projection.toTopLevelIndexes());
+    }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/ProjectionTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/ProjectionTest.java
new file mode 100644
index 0000000..5b3afba
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/ProjectionTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector;
+
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ProjectionTest {
+
+    @Test
+    void testTopLevelProject() {
+        assertEquals(
+                ROW(FIELD("f2", INT()), FIELD("f1", STRING())),
+                Projection.of(new int[] {2, 1})
+                        .project(
+                                ROW(
+                                        FIELD("f0", BIGINT()),
+                                        FIELD("f1", STRING()),
+                                        FIELD("f2", INT()))));
+    }
+
+    @Test
+    void testNestedProject() {
+        final DataType thirdLevelRow =
+                ROW(FIELD("c0", BOOLEAN()), FIELD("c1", DOUBLE()), FIELD("c2", INT()));
+        final DataType secondLevelRow =
+                ROW(FIELD("b0", BOOLEAN()), FIELD("b1", thirdLevelRow), FIELD("b2", INT()));
+        final DataType topLevelRow =
+                ROW(FIELD("a0", INT()), FIELD("a1", secondLevelRow), FIELD("a1_b1_c0", INT()));
+
+        assertEquals(
+                ROW(FIELD("a0", INT()), FIELD("a1_b1_c0", BOOLEAN())),
+                Projection.of(new int[][] {{0}, {1, 1, 0}}).project(topLevelRow));
+        assertEquals(
+                ROW(FIELD("a1_b1", thirdLevelRow), FIELD("a0", INT())),
+                Projection.of(new int[][] {{1, 1}, {0}}).project(topLevelRow));
+        assertEquals(
+                ROW(
+                        FIELD("a1_b1_c2", INT()),
+                        FIELD("a1_b1_c1", DOUBLE()),
+                        FIELD("a1_b1_c0", BOOLEAN())),
+                Projection.of(new int[][] {{1, 1, 2}, {1, 1, 1}, {1, 1, 0}}).project(topLevelRow));
+        assertEquals(
+                ROW(FIELD("a1_b1_c0", BOOLEAN()), FIELD("a1_b1_c0_$0", INT())),
+                Projection.of(new int[][] {{1, 1, 0}, {2}}).project(topLevelRow));
+    }
+
+    @Test
+    void testIsNested() {
+        assertFalse(Projection.of(new int[] {2, 1}).isNested());
+        assertFalse(Projection.of(new int[][] {new int[] {1}, new int[] {3}}).isNested());
+        assertTrue(
+                Projection.of(new int[][] {new int[] {1}, new int[] {1, 2}, new int[] {3}})
+                        .isNested());
+    }
+
+    @Test
+    void testDifference() {
+        assertEquals(
+                Projection.of(new int[] {1, 0, 2}),
+                Projection.of(new int[] {4, 1, 0, 3, 2})
+                        .difference(Projection.of(new int[] {4, 2})));
+
+        assertEquals(
+                Projection.of(new int[][] {new int[] {1, 3}, new int[] {0}, new int[] {2, 1}}),
+                Projection.of(
+                                new int[][] {
+                                    new int[] {4},
+                                    new int[] {1, 3},
+                                    new int[] {0},
+                                    new int[] {3, 1},
+                                    new int[] {2}
+                                })
+                        .difference(Projection.of(new int[] {4, 2})));
+
+        assertThrows(
+                IllegalArgumentException.class,
+                () ->
+                        Projection.of(new int[] {1, 2, 3, 4})
+                                .difference(
+                                        Projection.of(
+                                                new int[][] {new int[] {2}, new int[] {3, 4}})));
+    }
+
+    @Test
+    void testComplement() {
+        assertEquals(
+                Projection.of(new int[] {0, 3}), Projection.of(new int[] {4, 1, 2}).complement(5));
+
+        assertEquals(
+                Projection.of(new int[] {0, 3}),
+                Projection.of(new int[][] {new int[] {4}, new int[] {1}, new int[] {2}})
+                        .complement(5));
+
+        assertThrows(
+                IllegalStateException.class,
+                () ->
+                        Projection.of(new int[][] {new int[] {4}, new int[] {1, 3}, new int[] {2}})
+                                .complement(10));
+    }
+
+    @Test
+    void testToTopLevelIndexes() {
+        assertArrayEquals(
+                new int[] {1, 2, 3, 4}, Projection.of(new int[] {1, 2, 3, 4}).toTopLevelIndexes());
+
+        assertArrayEquals(
+                new int[] {4, 1, 2},
+                Projection.of(new int[][] {new int[] {4}, new int[] {1}, new int[] {2}})
+                        .toTopLevelIndexes());
+
+        assertThrows(
+                IllegalStateException.class,
+                () ->
+                        Projection.of(new int[][] {new int[] {4}, new int[] {1, 3}, new int[] {2}})
+                                .toTopLevelIndexes());
+    }
+
+    @Test
+    void testToNestedIndexes() {
+        assertArrayEquals(
+                new int[][] {new int[] {1}, new int[] {2}, new int[] {3}, new int[] {4}},
+                Projection.of(new int[] {1, 2, 3, 4}).toNestedIndexes());
+        assertArrayEquals(
+                new int[][] {new int[] {4}, new int[] {1, 3}, new int[] {2}},
+                Projection.of(new int[][] {new int[] {4}, new int[] {1, 3}, new int[] {2}})
+                        .toNestedIndexes());
+    }
+
+    @Test
+    void testEquals() {
+        assertEquals(
+                Projection.of(new int[] {1, 2, 3}),
+                Projection.of(new int[][] {new int[] {1}, new int[] {2}, new int[] {3}}));
+    }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/ProjectedRowDataTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/ProjectedRowDataTest.java
new file mode 100644
index 0000000..9bb2b05
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/ProjectedRowDataTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for {@link ProjectedRowData}. */
+public class ProjectedRowDataTest {
+
+    @Test
+    public void testProjectedRows() {
+        final RowData initialRow = GenericRowData.of(0L, 1L, 2L, 3L, 4L);
+        final ProjectedRowData projectedRowData =
+                ProjectedRowData.from(
+                        new int[][] {new int[] {2}, new int[] {0}, new int[] {1}, new int[] {4}});
+        projectedRowData.replaceRow(initialRow);
+
+        assertEquals(RowKind.INSERT, initialRow.getRowKind());
+        assertEquals(4, projectedRowData.getArity());
+        assertEquals(2L, projectedRowData.getLong(0));
+        assertEquals(0L, projectedRowData.getLong(1));
+        assertEquals(1L, projectedRowData.getLong(2));
+        assertEquals(4L, projectedRowData.getLong(3));
+
+        projectedRowData.replaceRow(GenericRowData.of(5L, 6L, 7L, 8L, 9L, 10L));
+        assertEquals(4, projectedRowData.getArity());
+        assertEquals(7L, projectedRowData.getLong(0));
+        assertEquals(5L, projectedRowData.getLong(1));
+        assertEquals(6L, projectedRowData.getLong(2));
+        assertEquals(9L, projectedRowData.getLong(3));
+    }
+
+    @Test
+    public void testProjectedRowsDoesntSupportNestedProjections() {
+        assertThrows(
+                IllegalArgumentException.class,
+                () ->
+                        ProjectedRowData.from(
+                                new int[][] {
+                                    new int[] {2}, new int[] {0, 1}, new int[] {1}, new int[] {4}
+                                }));
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
index 58c7b49..5dcf0ff 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
@@ -30,8 +30,6 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.PartitionPathUtils;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
@@ -163,37 +161,11 @@ class FileInfoExtractorBulkFormat implements BulkFormat<RowData, FileSourceSplit
         final EnrichedRowData producedRowData =
                 new EnrichedRowData(fileInfoRowData, this.extendedRowIndexMapping);
 
-        return new ReaderWrapper(superReader, producedRowData);
-    }
-
-    private static final class ReaderWrapper implements Reader<RowData> {
-
-        private final Reader<RowData> wrappedReader;
-        private final EnrichedRowData producedRowData;
-
-        private ReaderWrapper(Reader<RowData> wrappedReader, EnrichedRowData producedRowData) {
-            this.wrappedReader = wrappedReader;
-            this.producedRowData = producedRowData;
-        }
-
-        @Nullable
-        @Override
-        public RecordIterator<RowData> readBatch() throws IOException {
-            RecordIterator<RowData> iterator = wrappedReader.readBatch();
-            if (iterator == null) {
-                return null;
-            }
-            return new RecordMapperWrapperRecordIterator<>(
-                    iterator,
-                    physicalRowData -> {
-                        producedRowData.replaceMutableRow(physicalRowData);
-                        return producedRowData;
-                    });
-        }
-
-        @Override
-        public void close() throws IOException {
-            this.wrappedReader.close();
-        }
+        return RecordMapperWrapperRecordIterator.wrapReader(
+                superReader,
+                physicalRowData -> {
+                    producedRowData.replaceMutableRow(physicalRowData);
+                    return producedRowData;
+                });
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
index d1b12a7..09ef76f 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
@@ -33,8 +33,10 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.BulkDecodingFormat;
 import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
@@ -120,38 +122,37 @@ public class FileSystemTableSource extends AbstractFileSystemTable
             return InputFormatProvider.of(new CollectionInputFormat<>(new ArrayList<>(), null));
         }
 
-        // Physical type is computed from the full data type, filtering out partition and
-        // metadata columns. This type is going to be used by formats to parse the input.
-        List<DataTypes.Field> producedDataTypeFields = DataType.getFields(producedDataType);
-        if (metadataKeys != null && !metadataKeys.isEmpty()) {
-            // If metadata keys are present, then by SupportsReadingMetadata contract all the
-            // metadata columns will be at the end of the producedDataType, so we can just remove
-            // from the list the last metadataKeys.size() fields.
-            producedDataTypeFields =
-                    producedDataTypeFields.subList(
-                            0, producedDataTypeFields.size() - metadataKeys.size());
-        }
-        DataType physicalDataType =
-                producedDataTypeFields.stream()
-                        .filter(f -> partitionKeys == null || !partitionKeys.contains(f.getName()))
-                        .collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
-
         // Resolve metadata and make sure to filter out metadata not in the producedDataType
-        List<String> metadataKeys =
-                (this.metadataKeys == null) ? Collections.emptyList() : this.metadataKeys;
-        metadataKeys =
+        final List<String> metadataKeys =
                 DataType.getFieldNames(producedDataType).stream()
-                        .filter(metadataKeys::contains)
+                        .filter(
+                                ((this.metadataKeys == null)
+                                                ? Collections.emptyList()
+                                                : this.metadataKeys)
+                                        ::contains)
                         .collect(Collectors.toList());
-        List<ReadableFileInfo> metadataToExtract =
+        final List<ReadableFileInfo> metadataToExtract =
                 metadataKeys.stream().map(ReadableFileInfo::resolve).collect(Collectors.toList());
 
         // Filter out partition columns not in producedDataType
-        List<String> partitionKeysToExtract =
+        final List<String> partitionKeysToExtract =
                 DataType.getFieldNames(producedDataType).stream()
                         .filter(this.partitionKeys::contains)
                         .collect(Collectors.toList());
 
+        // Compute the physical projection and the physical data type, that is
+        // the type without partition columns and metadata in the same order of the schema
+        DataType physicalDataType = this.schema.toPhysicalRowDataType();
+        final Projection partitionKeysProjections =
+                Projection.fromFieldNames(physicalDataType, partitionKeysToExtract);
+        final Projection physicalProjections =
+                (projectFields != null
+                                ? Projection.of(projectFields)
+                                : Projection.all(physicalDataType))
+                        .difference(partitionKeysProjections);
+        physicalDataType =
+                partitionKeysProjections.complement(physicalDataType).project(physicalDataType);
+
         // TODO FLINK-19845 old format factory, to be removed soon. The old factory doesn't support
         //  metadata.
         if (formatFactory != null) {
@@ -176,23 +177,56 @@ public class FileSystemTableSource extends AbstractFileSystemTable
                     && filters.size() > 0) {
                 ((BulkDecodingFormat<RowData>) bulkReaderFormat).applyFilters(filters);
             }
-            BulkFormat<RowData, FileSourceSplit> bulkFormat =
+
+            BulkFormat<RowData, FileSourceSplit> format;
+            if (bulkReaderFormat instanceof ProjectableDecodingFormat) {
+                format =
+                        ((ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>)
+                                        bulkReaderFormat)
+                                .createRuntimeDecoder(
+                                        scanContext,
+                                        physicalDataType,
+                                        physicalProjections.toNestedIndexes());
+            } else {
+                format =
+                        new ProjectingBulkFormat(
+                                bulkReaderFormat.createRuntimeDecoder(
+                                        scanContext, physicalDataType),
+                                physicalProjections.toTopLevelIndexes(),
+                                scanContext.createTypeInformation(
+                                        physicalProjections.project(physicalDataType)));
+            }
+
+            format =
                     wrapBulkFormat(
-                            bulkReaderFormat.createRuntimeDecoder(scanContext, physicalDataType),
-                            producedDataType,
-                            metadataToExtract,
-                            partitionKeysToExtract);
-            return createSourceProvider(bulkFormat);
+                            format, producedDataType, metadataToExtract, partitionKeysToExtract);
+            return createSourceProvider(format);
         } else if (deserializationFormat != null) {
-            DeserializationSchema<RowData> decoder =
-                    deserializationFormat.createRuntimeDecoder(scanContext, physicalDataType);
-            BulkFormat<RowData, FileSourceSplit> bulkFormat =
+            BulkFormat<RowData, FileSourceSplit> format;
+            if (deserializationFormat instanceof ProjectableDecodingFormat) {
+                format =
+                        new DeserializationSchemaAdapter(
+                                ((ProjectableDecodingFormat<DeserializationSchema<RowData>>)
+                                                deserializationFormat)
+                                        .createRuntimeDecoder(
+                                                scanContext,
+                                                physicalDataType,
+                                                physicalProjections.toNestedIndexes()));
+            } else {
+                format =
+                        new ProjectingBulkFormat(
+                                new DeserializationSchemaAdapter(
+                                        deserializationFormat.createRuntimeDecoder(
+                                                scanContext, physicalDataType)),
+                                physicalProjections.toTopLevelIndexes(),
+                                scanContext.createTypeInformation(
+                                        physicalProjections.project(physicalDataType)));
+            }
+
+            format =
                     wrapBulkFormat(
-                            new DeserializationSchemaAdapter(decoder),
-                            producedDataType,
-                            metadataToExtract,
-                            partitionKeysToExtract);
-            return createSourceProvider(bulkFormat);
+                            format, producedDataType, metadataToExtract, partitionKeysToExtract);
+            return createSourceProvider(format);
         } else {
             throw new TableException("Can not find format factory.");
         }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/ProjectingBulkFormat.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/ProjectingBulkFormat.java
new file mode 100644
index 0000000..b60dc46
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/ProjectingBulkFormat.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+
+import java.io.IOException;
+
+/**
+ * This {@link BulkFormat} is a wrapper that performs projections for formats that don't support
+ * projections.
+ */
+class ProjectingBulkFormat implements BulkFormat<RowData, FileSourceSplit> {
+
+    private final BulkFormat<RowData, FileSourceSplit> wrapped;
+    private final TypeInformation<RowData> producedType;
+
+    private final int[] projections;
+
+    public ProjectingBulkFormat(
+            BulkFormat<RowData, FileSourceSplit> wrapped,
+            int[] projections,
+            TypeInformation<RowData> producedType) {
+        this.wrapped = wrapped;
+        this.projections = projections;
+        this.producedType = producedType;
+    }
+
+    @Override
+    public Reader<RowData> createReader(Configuration config, FileSourceSplit split)
+            throws IOException {
+        return wrapReader(wrapped.createReader(config, split), split);
+    }
+
+    @Override
+    public Reader<RowData> restoreReader(Configuration config, FileSourceSplit split)
+            throws IOException {
+        return wrapReader(wrapped.restoreReader(config, split), split);
+    }
+
+    @Override
+    public boolean isSplittable() {
+        return wrapped.isSplittable();
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedType;
+    }
+
+    private Reader<RowData> wrapReader(Reader<RowData> superReader, FileSourceSplit split) {
+        // This row is going to be reused for every record
+        final ProjectedRowData producedRowData = ProjectedRowData.from(this.projections);
+
+        return RecordMapperWrapperRecordIterator.wrapReader(
+                superReader,
+                physicalRowData -> {
+                    producedRowData.replaceRow(physicalRowData);
+                    return producedRowData;
+                });
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
index 03a80e7..0235cd6 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -43,9 +44,7 @@ import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER;
 import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_LINE_DELIMITER;
@@ -120,24 +119,16 @@ public class TestCsvFileSystemFormatFactory
     @Override
     public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
             DynamicTableFactory.Context context, ReadableConfig formatOptions) {
-        List<String> schemaFields =
-                DataType.getFieldNames(context.getPhysicalRowDataType()).stream()
-                        .filter(
-                                field ->
-                                        !context.getCatalogTable()
-                                                .getPartitionKeys()
-                                                .contains(field))
-                        .collect(Collectors.toList());
-        return new DecodingFormat<DeserializationSchema<RowData>>() {
+        return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {
             @Override
             public DeserializationSchema<RowData> createRuntimeDecoder(
-                    DynamicTableSource.Context context, DataType physicalDataType) {
-                // TestCsvDeserializationSchema has no knowledge of the field names, and the
-                // implicit assumption done by tests is that the csv rows are composed by only the
-                // physical fields (excluding partition fields) in the same order as defined in the
-                // table declaration. This is why TestCsvDeserializationSchema needs
-                // schemaFields.
-                return new TestCsvDeserializationSchema(physicalDataType, schemaFields);
+                    DynamicTableSource.Context context,
+                    DataType physicalDataType,
+                    int[][] projections) {
+                DataType projectedPhysicalDataType =
+                        DataType.projectFields(physicalDataType, projections);
+                return new TestCsvDeserializationSchema(
+                        projectedPhysicalDataType, DataType.getFieldNames(physicalDataType));
             }
 
             @Override