You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/09 14:44:16 UTC

[flink] branch master updated: [FLINK-24615][table] Add infrastructure to support metadata for filesystem

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ea2102  [FLINK-24615][table] Add infrastructure to support metadata for filesystem
1ea2102 is described below

commit 1ea210278f1da41f193a7520306cab20596fd10f
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Tue Oct 19 17:31:29 2021 +0200

    [FLINK-24615][table] Add infrastructure to support metadata for filesystem
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
    
    This closes #17544.
---
 docs/content/docs/connectors/table/filesystem.md   |  35 +++
 .../util/RecordMapperWrapperRecordIterator.java    |  64 ++++
 .../flink/formats/avro/AvroFilesystemITCase.java   |   5 +
 .../formats/csv/CsvFilesystemBatchITCase.java      |  10 +
 .../formats/csv/CsvFilesystemStreamITCase.java     |   5 +
 .../flink/table/data/utils/EnrichedRowData.java    | 323 +++++++++++++++++++++
 .../table/data/utils/EnrichedRowDataTest.java      |  75 +++++
 .../filesystem/FileSystemTableSourceTest.java      |  18 ++
 .../table/filesystem/FileSystemTableSourceTest.xml |  19 ++
 .../planner/runtime/FileSystemITCaseBase.scala     |  99 ++++++-
 .../batch/sql/BatchFileSystemITCaseBase.scala      |  19 +-
 .../stream/sql/StreamFileSystemITCaseBase.scala    |  30 +-
 .../stream/sql/StreamFileSystemTestCsvITCase.scala |   2 +
 .../table/filesystem/AbstractFileSystemTable.java  |  14 +-
 .../filesystem/DeserializationSchemaAdapter.java   | 106 +------
 .../filesystem/FileInfoExtractorBulkFormat.java    | 200 +++++++++++++
 .../table/filesystem/FileSystemTableSink.java      |  46 ++-
 .../table/filesystem/FileSystemTableSource.java    | 236 ++++++++++++---
 .../filesystem/TestCsvDeserializationSchema.java   |  60 ++--
 .../filesystem/TestCsvFileSystemFormatFactory.java |  17 +-
 20 files changed, 1180 insertions(+), 203 deletions(-)

diff --git a/docs/content/docs/connectors/table/filesystem.md b/docs/content/docs/connectors/table/filesystem.md
index 7f143b3..82ba012 100644
--- a/docs/content/docs/connectors/table/filesystem.md
+++ b/docs/content/docs/connectors/table/filesystem.md
@@ -108,6 +108,41 @@ The file system connector can be used to read single files or entire directories
 
 When using a directory as the source path, there is **no defined order of ingestion** for the files inside the directory.
 
+### Available Metadata
+
+The following connector metadata can be accessed as metadata columns in a table definition. All the metadata are read only.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Key</th>
+      <th class="text-center" style="width: 30%">Data Type</th>
+      <th class="text-center" style="width: 40%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>filepath</code></td>
+      <td><code>STRING NOT NULL</code></td>
+      <td>Full path of the input file.</td>
+    </tr>
+    </tbody>
+</table>
+
+The extended `CREATE TABLE` example demonstrates the syntax for exposing these metadata fields:
+
+```sql
+CREATE TABLE MyUserTableWithFilepath (
+  column_name1 INT,
+  column_name2 STRING,
+  filepath STRING NOT NULL METADATA
+) WITH (
+  'connector' = 'filesystem',
+  'path' = 'file:///path/to/whatever',
+  'format' = 'json'
+)
+```
+
 ## Streaming Sink
 
 The file system connector supports streaming writes, based on Flink's [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}})
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java
new file mode 100644
index 0000000..76c4fb0
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+/**
+ * Implementation of {@link org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator}
+ * that wraps another iterator and performs the mapping of the records.
+ *
+ * @param <I> Input type
+ * @param <O> Mapped output type
+ */
+public class RecordMapperWrapperRecordIterator<I, O> implements BulkFormat.RecordIterator<O> {
+
+    /** Record mapper definition. */
+    @FunctionalInterface
+    public interface RecordMapper<I, O> {
+        /** Map the record. Both input value and output value are expected to be non-null. */
+        O map(I in);
+    }
+
+    private final BulkFormat.RecordIterator<I> wrapped;
+    private final RecordMapper<I, O> mapper;
+
+    public RecordMapperWrapperRecordIterator(
+            BulkFormat.RecordIterator<I> wrapped, RecordMapper<I, O> mapper) {
+        this.wrapped = wrapped;
+        this.mapper = mapper;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    @Override
+    public RecordAndPosition<O> next() {
+        RecordAndPosition record = this.wrapped.next();
+        if (record == null || record.getRecord() == null) {
+            return (RecordAndPosition<O>) record;
+        }
+
+        record.record = mapper.map((I) record.record);
+        return (RecordAndPosition<O>) record;
+    }
+
+    @Override
+    public void releaseBatch() {
+        this.wrapped.releaseBatch();
+    }
+}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemITCase.java
index e7a8029..b89c2be 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemITCase.java
@@ -46,6 +46,11 @@ public class AvroFilesystemITCase extends BatchFileSystemITCaseBase {
     }
 
     @Override
+    public boolean supportsReadingMetadata() {
+        return false;
+    }
+
+    @Override
     public String[] formatProperties() {
         List<String> ret = new ArrayList<>();
         ret.add("'format'='avro'");
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemBatchITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemBatchITCase.java
index 8df8ddb..f6dadc5 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemBatchITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemBatchITCase.java
@@ -40,6 +40,11 @@ public class CsvFilesystemBatchITCase {
     public static class GeneralCsvFilesystemBatchITCase extends BatchFileSystemITCaseBase {
 
         @Override
+        public boolean supportsReadingMetadata() {
+            return false;
+        }
+
+        @Override
         public String[] formatProperties() {
             List<String> ret = new ArrayList<>();
             ret.add("'format'='csv'");
@@ -56,6 +61,11 @@ public class CsvFilesystemBatchITCase {
     public static class EnrichedCsvFilesystemBatchITCase extends BatchFileSystemITCaseBase {
 
         @Override
+        public boolean supportsReadingMetadata() {
+            return false;
+        }
+
+        @Override
         public String[] formatProperties() {
             List<String> ret = new ArrayList<>();
             ret.add("'format'='csv'");
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
index 1070682..aa41694 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
@@ -27,6 +27,11 @@ import java.util.List;
 public class CsvFilesystemStreamITCase extends StreamFileSystemITCaseBase {
 
     @Override
+    public boolean supportsReadingMetadata() {
+        return false;
+    }
+
+    @Override
     public String[] formatProperties() {
         List<String> ret = new ArrayList<>();
         ret.add("'format'='csv'");
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/EnrichedRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/EnrichedRowData.java
new file mode 100644
index 0000000..1743563
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/EnrichedRowData.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * An implementation of {@link RowData} which is backed by two {@link RowData} with a well-defined
+ * index mapping, One of the rows is fixed, while the other can be swapped for performant changes in
+ * hot code paths. The {@link RowKind} is inherited from the mutable row.
+ */
+@PublicEvolving
+public class EnrichedRowData implements RowData {
+
+    private final RowData fixedRow;
+    // The index mapping is built as follows: positive indexes are indexes refer to mutable row
+    // positions,
+    // while negative indexes (with -1 offset) refer to fixed row positions.
+    // For example an index mapping [0, 1, -1, -2, 2] means:
+    // * Index 0 -> mutable row index 0
+    // * Index 1 -> mutable row index 1
+    // * Index -1 -> fixed row index 0
+    // * Index -2 -> fixed row index 1
+    // * Index 2 -> mutable row index 2
+    private final int[] indexMapping;
+
+    private RowData mutableRow;
+
+    public EnrichedRowData(RowData fixedRow, int[] indexMapping) {
+        this.fixedRow = fixedRow;
+        this.indexMapping = indexMapping;
+    }
+
+    /**
+     * Replaces the mutable {@link RowData} backing this {@link EnrichedRowData}.
+     *
+     * <p>This method replaces the mutable row data in place and does not return a new object. This
+     * is done for performance reasons.
+     */
+    public EnrichedRowData replaceMutableRow(RowData mutableRow) {
+        this.mutableRow = mutableRow;
+        return this;
+    }
+
+    // ---------------------------------------------------------------------------------------------
+
+    @Override
+    public int getArity() {
+        return indexMapping.length;
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return mutableRow.getRowKind();
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        mutableRow.setRowKind(kind);
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.isNullAt(index);
+        } else {
+            return fixedRow.isNullAt(-(index + 1));
+        }
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getBoolean(index);
+        } else {
+            return fixedRow.getBoolean(-(index + 1));
+        }
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getByte(index);
+        } else {
+            return fixedRow.getByte(-(index + 1));
+        }
+    }
+
+    @Override
+    public short getShort(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getShort(index);
+        } else {
+            return fixedRow.getShort(-(index + 1));
+        }
+    }
+
+    @Override
+    public int getInt(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getInt(index);
+        } else {
+            return fixedRow.getInt(-(index + 1));
+        }
+    }
+
+    @Override
+    public long getLong(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getLong(index);
+        } else {
+            return fixedRow.getLong(-(index + 1));
+        }
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getFloat(index);
+        } else {
+            return fixedRow.getFloat(-(index + 1));
+        }
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getDouble(index);
+        } else {
+            return fixedRow.getDouble(-(index + 1));
+        }
+    }
+
+    @Override
+    public StringData getString(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getString(index);
+        } else {
+            return fixedRow.getString(-(index + 1));
+        }
+    }
+
+    @Override
+    public DecimalData getDecimal(int pos, int precision, int scale) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getDecimal(index, precision, scale);
+        } else {
+            return fixedRow.getDecimal(-(index + 1), precision, scale);
+        }
+    }
+
+    @Override
+    public TimestampData getTimestamp(int pos, int precision) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getTimestamp(index, precision);
+        } else {
+            return fixedRow.getTimestamp(-(index + 1), precision);
+        }
+    }
+
+    @Override
+    public <T> RawValueData<T> getRawValue(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getRawValue(index);
+        } else {
+            return fixedRow.getRawValue(-(index + 1));
+        }
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getBinary(index);
+        } else {
+            return fixedRow.getBinary(-(index + 1));
+        }
+    }
+
+    @Override
+    public ArrayData getArray(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getArray(index);
+        } else {
+            return fixedRow.getArray(-(index + 1));
+        }
+    }
+
+    @Override
+    public MapData getMap(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getMap(index);
+        } else {
+            return fixedRow.getMap(-(index + 1));
+        }
+    }
+
+    @Override
+    public RowData getRow(int pos, int numFields) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getRow(index, numFields);
+        } else {
+            return fixedRow.getRow(-(index + 1), numFields);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        EnrichedRowData that = (EnrichedRowData) o;
+        return Objects.equals(this.fixedRow, that.fixedRow)
+                && Objects.equals(this.mutableRow, that.mutableRow);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(fixedRow, mutableRow);
+    }
+
+    @Override
+    public String toString() {
+        return mutableRow.getRowKind().shortString()
+                + "{"
+                + "fixedRow="
+                + fixedRow
+                + ", mutableRow="
+                + mutableRow
+                + '}';
+    }
+
+    /**
+     * Creates a new {@link EnrichedRowData} with the provided {@code fixedRow} as the immutable
+     * static row, and uses the {@code producedRowFields}, {@code fixedRowFields} and {@code
+     * mutableRowFields} arguments to compute the indexes mapping.
+     *
+     * <p>The {@code producedRowFields} should include the name of fields of the full row once
+     * mutable and fixed rows are merged, while {@code fixedRowFields} and {@code mutableRowFields}
+     * should contain respectively the field names of fixed row and mutable row. All the lists are
+     * ordered with indexes matching the position of the field in the row. As an example, for a
+     * complete row {@code (a, b, c)} the mutable row might be {@code (a, c)} and the fixed row
+     * might be {@code (b)}
+     */
+    public static EnrichedRowData from(
+            RowData fixedRow,
+            List<String> producedRowFields,
+            List<String> mutableRowFields,
+            List<String> fixedRowFields) {
+        return new EnrichedRowData(
+                fixedRow, computeIndexMapping(producedRowFields, mutableRowFields, fixedRowFields));
+    }
+
+    /**
+     * This method computes the index mapping for {@link EnrichedRowData}.
+     *
+     * @see EnrichedRowData#from(RowData, List, List, List)
+     */
+    public static int[] computeIndexMapping(
+            List<String> producedRowFields,
+            List<String> mutableRowFields,
+            List<String> fixedRowFields) {
+        int[] indexMapping = new int[producedRowFields.size()];
+
+        for (int i = 0; i < producedRowFields.size(); i++) {
+            String fieldName = producedRowFields.get(i);
+
+            int newIndex = mutableRowFields.indexOf(fieldName);
+            if (newIndex < 0) {
+                newIndex = -(fixedRowFields.indexOf(fieldName) + 1);
+            }
+
+            indexMapping[i] = newIndex;
+        }
+
+        return indexMapping;
+    }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/EnrichedRowDataTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/EnrichedRowDataTest.java
new file mode 100644
index 0000000..70a42f9
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/EnrichedRowDataTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link JoinedRowData}. */
+public class EnrichedRowDataTest {
+
+    @Test
+    public void testJoinedRows() {
+        final List<String> completeRowFields =
+                Arrays.asList(
+                        "fixedRow1",
+                        "mutableRow1",
+                        "mutableRow3",
+                        "mutableRow2",
+                        "fixedRow2",
+                        "mutableRow4");
+        final List<String> mutableRowFields =
+                Arrays.asList("mutableRow1", "mutableRow2", "mutableRow3", "mutableRow4");
+        final List<String> fixedRowFields = Arrays.asList("fixedRow1", "fixedRow2");
+
+        final RowData fixedRowData = GenericRowData.of(1L, 2L);
+        final EnrichedRowData enrichedRowData =
+                EnrichedRowData.from(
+                        fixedRowData, completeRowFields, mutableRowFields, fixedRowFields);
+        final RowData mutableRowData = GenericRowData.of(3L, 4L, 5L, 6L);
+        enrichedRowData.replaceMutableRow(mutableRowData);
+
+        assertEquals(RowKind.INSERT, enrichedRowData.getRowKind());
+        assertEquals(6, enrichedRowData.getArity());
+        assertEquals(1L, enrichedRowData.getLong(0));
+        assertEquals(3L, enrichedRowData.getLong(1));
+        assertEquals(5L, enrichedRowData.getLong(2));
+        assertEquals(4L, enrichedRowData.getLong(3));
+        assertEquals(2L, enrichedRowData.getLong(4));
+        assertEquals(6L, enrichedRowData.getLong(5));
+
+        final RowData newMutableRowData = GenericRowData.of(7L, 8L, 9L, 10L);
+        enrichedRowData.replaceMutableRow(newMutableRowData);
+
+        assertEquals(1L, enrichedRowData.getLong(0));
+        assertEquals(7L, enrichedRowData.getLong(1));
+        assertEquals(9L, enrichedRowData.getLong(2));
+        assertEquals(8L, enrichedRowData.getLong(3));
+        assertEquals(2L, enrichedRowData.getLong(4));
+        assertEquals(10L, enrichedRowData.getLong(5));
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSourceTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSourceTest.java
index 0827988..958e66f 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSourceTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSourceTest.java
@@ -47,6 +47,18 @@ public class FileSystemTableSourceTest extends TableTestBase {
                         + " 'path' = '/tmp')";
         tEnv.executeSql(srcTableDdl);
 
+        String srcTableWithMetaDdl =
+                "CREATE TABLE MyTableWithMeta (\n"
+                        + "  a bigint,\n"
+                        + "  b int,\n"
+                        + "  c varchar,\n"
+                        + "  filemeta STRING METADATA FROM 'filepath'\n"
+                        + ") with (\n"
+                        + " 'connector' = 'filesystem',"
+                        + " 'format' = 'testcsv',"
+                        + " 'path' = '/tmp')";
+        tEnv.executeSql(srcTableWithMetaDdl);
+
         String sinkTableDdl =
                 "CREATE TABLE MySink (\n"
                         + "  a bigint,\n"
@@ -62,4 +74,10 @@ public class FileSystemTableSourceTest extends TableTestBase {
     public void testFilterPushDown() {
         util.verifyRelPlanInsert("insert into MySink select * from MyTable where a > 10");
     }
+
+    @Test
+    public void testMetadataReading() {
+        util.verifyRelPlanInsert(
+                "insert into MySink(a, b, c) select a, b, filemeta from MyTableWithMeta");
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/filesystem/FileSystemTableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/filesystem/FileSystemTableSourceTest.xml
index 97cde4f..087251c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/filesystem/FileSystemTableSourceTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/filesystem/FileSystemTableSourceTest.xml
@@ -36,4 +36,23 @@ Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testMetadataReading">
+    <Resource name="sql">
+      <![CDATA[insert into MySink(a, b, c) select a, b, filemeta from MyTableWithMeta]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, filemeta])
++- LogicalProject(a=[$0], b=[$1], filemeta=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTableWithMeta]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.MySink], fields=[a, b, filemeta])
++- Calc(select=[a, b, CAST(filepath) AS filemeta])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTableWithMeta, project=[a, b], metadata=[filepath]]], fields=[a, b, filepath])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index d80e5b4..85a81d4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -27,13 +27,11 @@ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase._
 import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.types.Row
-
-import org.junit.Assert.assertTrue
+import org.junit.Assert.{assertEquals, assertNotNull, assertTrue}
 import org.junit.rules.TemporaryFolder
 import org.junit.{Rule, Test}
 
 import java.io.File
-
 import scala.collection.{JavaConverters, Seq}
 
 /**
@@ -51,6 +49,8 @@ trait FileSystemITCaseBase {
 
   def tableEnv: TableEnvironment
 
+  def checkPredicate(sqlQuery: String, checkFunc: Row => Unit): Unit
+
   def check(sqlQuery: String, expectedResult: Seq[Row]): Unit
 
   def check(sqlQuery: String, expectedResult: java.util.List[Row]): Unit = {
@@ -58,6 +58,8 @@ trait FileSystemITCaseBase {
       JavaConverters.asScalaIteratorConverter(expectedResult.iterator()).asScala.toSeq)
   }
 
+  def supportsReadingMetadata: Boolean = true
+
   def open(): Unit = {
     resultPath = fileTmpFolder.newFolder().toURI.toString
     BatchTableEnvUtil.registerCollection(
@@ -81,6 +83,24 @@ trait FileSystemITCaseBase {
          |)
        """.stripMargin
     )
+    if (supportsReadingMetadata) {
+      tableEnv.executeSql(
+        s"""
+           |create table partitionedTableWithMetadata (
+           |  x string,
+           |  y int,
+           |  a int,
+           |  b bigint,
+           |  c as b + 1,
+           |  f string metadata from 'filepath'
+           |) partitioned by (a, b) with (
+           |  'connector' = 'filesystem',
+           |  'path' = '$resultPath',
+           |  ${formatProperties().mkString(",\n")}
+           |)
+           """.stripMargin
+      )
+    }
     tableEnv.executeSql(
       s"""
          |create table nonPartitionedTable (
@@ -95,6 +115,23 @@ trait FileSystemITCaseBase {
          |)
        """.stripMargin
     )
+    if (supportsReadingMetadata) {
+      tableEnv.executeSql(
+        s"""
+           |create table nonPartitionedTableWithMetadata (
+           |  x string,
+           |  y int,
+           |  a int,
+           |  f string metadata from 'filepath',
+           |  b bigint
+           |) with (
+           |  'connector' = 'filesystem',
+           |  'path' = '$resultPath',
+           |  ${formatProperties().mkString(",\n")}
+           |)
+         """.stripMargin
+      )
+    }
 
     tableEnv.executeSql(
       s"""
@@ -184,6 +221,40 @@ trait FileSystemITCaseBase {
   }
 
   @Test
+  def testAllStaticPartitionsWithMetadata(): Unit = {
+    if (!supportsReadingMetadata) {
+      return
+    }
+
+    tableEnv.executeSql("insert into partitionedTable " +
+      "partition(a='1', b='1') select x, y from originalT where a=1 and b=1").await()
+
+    checkPredicate(
+      "select x, f, y from partitionedTableWithMetadata where a=1 and b=1",
+      row => {
+        assertEquals(3, row.getArity)
+        assertNotNull(row.getField("f"))
+        assertNotNull(row.getField(1))
+        assertTrue(
+          "The filepath value should begin with the temporary test path",
+          row.getFieldAs[String](1).contains(fileTmpFolder.getRoot.getPath))
+      }
+    )
+
+    checkPredicate(
+      "select x, f, y from partitionedTableWithMetadata",
+      row => {
+        assertEquals(3, row.getArity)
+        assertNotNull(row.getField("f"))
+        assertNotNull(row.getField(1))
+        assertTrue(
+          "The filepath value should begin with the temporary test path",
+          row.getFieldAs[String](1).contains(fileTmpFolder.getRoot.getPath))
+      }
+    )
+  }
+
+  @Test
   def testPartialDynamicPartition(): Unit = {
     tableEnv.executeSql("insert into partitionedTable " +
         "partition(a=3) select x, y, b from originalT where a=3").await()
@@ -276,6 +347,28 @@ trait FileSystemITCaseBase {
   }
 
   @Test
+  def testNonPartitionWithMetadata(): Unit = {
+    if (!supportsReadingMetadata) {
+      return
+    }
+
+    tableEnv.executeSql("insert into nonPartitionedTable " +
+      "select x, y, a, b from originalT where a=1 and b=1").await()
+
+    checkPredicate(
+      "select x, f, y from nonPartitionedTableWithMetadata where a=1 and b=1",
+      row => {
+        assertEquals(3, row.getArity)
+        assertNotNull(row.getField("f"))
+        assertNotNull(row.getField(1))
+        assertTrue(
+          "The filepath value should begin with the temporary test path",
+          row.getFieldAs[String](1).contains(fileTmpFolder.getRoot.getPath))
+      }
+    )
+  }
+
+  @Test
   def testLimitPushDown(): Unit = {
     tableEnv.getConfig.getConfiguration.setInteger(
       ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala
index b7d923c..86797e1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala
@@ -22,9 +22,9 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.types.Row
+import org.junit.{Assert, Before}
 
-import org.junit.Before
-
+import java.lang.AssertionError
 import scala.collection.Seq
 
 /**
@@ -45,4 +45,19 @@ abstract class BatchFileSystemITCaseBase extends BatchTestBase with FileSystemIT
   override def check(sqlQuery: String, expectedResult: Seq[Row]): Unit = {
     checkResult(sqlQuery, expectedResult)
   }
+
+  override def checkPredicate(sqlQuery: String, checkFunc: Row => Unit): Unit = {
+    val table = parseQuery(sqlQuery)
+    val result = executeQuery(table)
+
+    try {
+      result.foreach(checkFunc)
+    } catch {
+      case e: AssertionError => throw new AssertionError(
+        s"""
+           |Results do not match for query:
+           |  $sqlQuery
+     """.stripMargin, e)
+    }
+  }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
index 5b78fee..a0fd403 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
@@ -19,16 +19,16 @@
 package org.apache.flink.table.planner.runtime.stream.sql
 
 import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
-import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestSinkUtil, TestingAppendSink}
+import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestSinkUtil, TestingAppendSink}
 import org.apache.flink.types.Row
-
 import org.junit.Assert.assertEquals
-import org.junit.{Before, Test}
+import org.junit.{Assert, Before, Test}
 
-import scala.collection.Seq
+import scala.collection.{Seq, mutable}
 
 /**
   * Streaming [[FileSystemITCaseBase]].
@@ -56,6 +56,28 @@ abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSys
       sink.getAppendResults.sorted)
   }
 
+  override def checkPredicate(sqlQuery: String, checkFunc: Row => Unit): Unit = {
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sinkResults = new mutable.MutableList[Row]
+
+    val sink = new AbstractExactlyOnceSink[Row] {
+      override def invoke(value: Row, context: SinkFunction.Context): Unit =
+        sinkResults += value
+    }
+    result.addSink(sink)
+    env.execute()
+
+    try {
+      sinkResults.foreach(checkFunc)
+    } catch {
+      case e: AssertionError => throw new AssertionError(
+        s"""
+           |Results do not match for query:
+           |  $sqlQuery
+       """.stripMargin, e)
+    }
+  }
+
   // Streaming mode not support overwrite
   @Test
   override def testInsertOverwrite(): Unit = {}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
index 4ead229..065abd1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
+import org.apache.flink.types.Row
+
 import scala.collection.Seq
 
 /**
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
index c8bcdd3..b96a371 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.types.DataType;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.filesystem.FileSystemConnectorOptions.PARTITION_DEFAULT_NAME;
 import static org.apache.flink.table.filesystem.FileSystemConnectorOptions.PATH;
@@ -40,19 +41,21 @@ abstract class AbstractFileSystemTable {
     final ObjectIdentifier tableIdentifier;
     final Configuration tableOptions;
     final ResolvedSchema schema;
-    final List<String> partitionKeys;
     final Path path;
     final String defaultPartName;
 
+    List<String> partitionKeys;
+
     AbstractFileSystemTable(DynamicTableFactory.Context context) {
         this.context = context;
         this.tableIdentifier = context.getObjectIdentifier();
         this.tableOptions = new Configuration();
         context.getCatalogTable().getOptions().forEach(tableOptions::setString);
         this.schema = context.getCatalogTable().getResolvedSchema();
-        this.partitionKeys = context.getCatalogTable().getPartitionKeys();
         this.path = new Path(tableOptions.get(PATH));
         this.defaultPartName = tableOptions.get(PARTITION_DEFAULT_NAME);
+
+        this.partitionKeys = context.getCatalogTable().getPartitionKeys();
     }
 
     ReadableConfig formatOptions(String identifier) {
@@ -64,9 +67,8 @@ abstract class AbstractFileSystemTable {
     }
 
     DataType getPhysicalDataTypeWithoutPartitionColumns() {
-        return DataTypes.ROW(
-                DataType.getFields(getPhysicalDataType()).stream()
-                        .filter(field -> !partitionKeys.contains(field.getName()))
-                        .toArray(DataTypes.Field[]::new));
+        return DataType.getFields(getPhysicalDataType()).stream()
+                .filter(field -> !partitionKeys.contains(field.getName()))
+                .collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/DeserializationSchemaAdapter.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/DeserializationSchemaAdapter.java
index 68fae2b..73851fa 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/DeserializationSchemaAdapter.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/DeserializationSchemaAdapter.java
@@ -28,14 +28,7 @@ import org.apache.flink.connector.file.src.util.ArrayResultIterator;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.UserCodeClassLoader;
@@ -45,10 +38,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
 import java.util.Queue;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.connector.file.src.util.CheckpointedPosition.NO_OFFSET;
 import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
@@ -58,71 +48,10 @@ public class DeserializationSchemaAdapter implements BulkFormat<RowData, FileSou
 
     private static final int BATCH_SIZE = 100;
 
-    // NOTE, deserializationSchema produce full format fields with original order
     private final DeserializationSchema<RowData> deserializationSchema;
 
-    private final String[] fieldNames;
-    private final DataType[] fieldTypes;
-    private final int[] projectFields;
-    private final RowType projectedRowType;
-
-    private final List<String> partitionKeys;
-    private final String defaultPartValue;
-
-    private final int[] toProjectedField;
-    private final RowData.FieldGetter[] formatFieldGetters;
-
-    public DeserializationSchemaAdapter(
-            DeserializationSchema<RowData> deserializationSchema,
-            DataType physicalDataType,
-            int[] projectFields,
-            List<String> partitionKeys,
-            String defaultPartValue) {
+    public DeserializationSchemaAdapter(DeserializationSchema<RowData> deserializationSchema) {
         this.deserializationSchema = deserializationSchema;
-        this.fieldNames = DataType.getFieldNames(physicalDataType).toArray(new String[0]);
-        this.fieldTypes = DataType.getFieldDataTypes(physicalDataType).toArray(new DataType[0]);
-        this.projectFields = projectFields;
-        this.partitionKeys = partitionKeys;
-        this.defaultPartValue = defaultPartValue;
-
-        List<String> projectedNames =
-                Arrays.stream(projectFields)
-                        .mapToObj(idx -> this.fieldNames[idx])
-                        .collect(Collectors.toList());
-
-        this.projectedRowType =
-                RowType.of(
-                        Arrays.stream(projectFields)
-                                .mapToObj(idx -> this.fieldTypes[idx].getLogicalType())
-                                .toArray(LogicalType[]::new),
-                        projectedNames.toArray(new String[0]));
-
-        List<String> formatFields =
-                Arrays.stream(this.fieldNames)
-                        .filter(field -> !partitionKeys.contains(field))
-                        .collect(Collectors.toList());
-
-        List<String> formatProjectedFields =
-                projectedNames.stream()
-                        .filter(field -> !partitionKeys.contains(field))
-                        .collect(Collectors.toList());
-
-        this.toProjectedField =
-                formatProjectedFields.stream().mapToInt(projectedNames::indexOf).toArray();
-
-        this.formatFieldGetters = new RowData.FieldGetter[formatProjectedFields.size()];
-        final Map<String, DataType> fieldDataTypesMap =
-                DataType.getFields(physicalDataType).stream()
-                        .collect(
-                                Collectors.toMap(
-                                        DataTypes.Field::getName, DataTypes.Field::getDataType));
-        for (int i = 0; i < formatProjectedFields.size(); i++) {
-            String name = formatProjectedFields.get(i);
-            this.formatFieldGetters[i] =
-                    RowData.createFieldGetter(
-                            fieldDataTypesMap.get(name).getLogicalType(),
-                            formatFields.indexOf(name));
-        }
     }
 
     private DeserializationSchema<RowData> createDeserialization() throws IOException {
@@ -168,7 +97,7 @@ public class DeserializationSchemaAdapter implements BulkFormat<RowData, FileSou
 
     @Override
     public TypeInformation<RowData> getProducedType() {
-        return InternalTypeInfo.of(projectedRowType);
+        return deserializationSchema.getProducedType();
     }
 
     private class Reader implements BulkFormat.Reader<RowData> {
@@ -233,7 +162,6 @@ public class DeserializationSchemaAdapter implements BulkFormat<RowData, FileSou
 
         private transient boolean end;
         private transient RecordCollector collector;
-        private transient GenericRowData rowData;
 
         public LineBytesInputFormat(Path path, Configuration config) throws IOException {
             super(path, config);
@@ -245,22 +173,6 @@ public class DeserializationSchemaAdapter implements BulkFormat<RowData, FileSou
             super.open(split);
             this.end = false;
             this.collector = new RecordCollector();
-            this.rowData =
-                    PartitionPathUtils.fillPartitionValueForRecord(
-                            fieldNames,
-                            fieldTypes,
-                            projectFields,
-                            partitionKeys,
-                            currentSplit.getPath(),
-                            defaultPartValue);
-        }
-
-        private GenericRowData newOutputRow() {
-            GenericRowData row = new GenericRowData(rowData.getArity());
-            for (int i = 0; i < row.getArity(); i++) {
-                row.setField(i, rowData.getField(i));
-            }
-            return row;
         }
 
         @Override
@@ -284,24 +196,12 @@ public class DeserializationSchemaAdapter implements BulkFormat<RowData, FileSou
             return null;
         }
 
-        private RowData convert(RowData record) {
-            GenericRowData outputRow = newOutputRow();
-
-            for (int i = 0; i < toProjectedField.length; i++) {
-                outputRow.setField(
-                        toProjectedField[i], formatFieldGetters[i].getFieldOrNull(record));
-            }
-
-            outputRow.setRowKind(record.getRowKind());
-            return outputRow;
-        }
-
         @Override
         public RowData nextRecord(RowData reuse) throws IOException {
             while (true) {
                 RowData record = collector.records.poll();
                 if (record != null) {
-                    return convert(record);
+                    return record;
                 }
 
                 if (readLine()) {
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
new file mode 100644
index 0000000..1a0742e
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.EnrichedRowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.PartitionPathUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This {@link BulkFormat} is a wrapper that attaches file information columns to the output
+ * records.
+ */
+class FileInfoExtractorBulkFormat implements BulkFormat<RowData, FileSourceSplit> {
+
+    private final BulkFormat<RowData, FileSourceSplit> wrapped;
+    private final TypeInformation<RowData> producedType;
+
+    private final List<FileSystemTableSource.FileInfoAccessor> metadataColumnsFunctions;
+    private final List<Map.Entry<String, DataType>> partitionColumnTypes;
+    private final int[] extendedRowIndexMapping;
+
+    private final String defaultPartName;
+
+    public FileInfoExtractorBulkFormat(
+            BulkFormat<RowData, FileSourceSplit> wrapped,
+            DataType producedDataType,
+            Map<String, FileSystemTableSource.FileInfoAccessor> metadataColumns,
+            List<String> partitionColumns,
+            String defaultPartName) {
+        this.wrapped = wrapped;
+        this.producedType = InternalTypeInfo.of(producedDataType.getLogicalType());
+        this.defaultPartName = defaultPartName;
+
+        // Compute index mapping for the extended row and the functions to compute metadata
+        List<DataTypes.Field> producedRowField = DataType.getFields(producedDataType);
+        List<String> producedRowFieldNames =
+                producedRowField.stream()
+                        .map(DataTypes.Field::getName)
+                        .collect(Collectors.toList());
+        List<String> mutableRowFieldNames =
+                producedRowFieldNames.stream()
+                        .filter(
+                                key ->
+                                        !metadataColumns.containsKey(key)
+                                                && !partitionColumns.contains(key))
+                        .collect(Collectors.toList());
+        List<String> metadataFieldNames = new ArrayList<>(metadataColumns.keySet());
+
+        List<String> fixedRowFieldNames =
+                Stream.concat(metadataFieldNames.stream(), partitionColumns.stream())
+                        .collect(Collectors.toList());
+
+        this.partitionColumnTypes =
+                partitionColumns.stream()
+                        .map(
+                                fieldName ->
+                                        new SimpleImmutableEntry<>(
+                                                fieldName,
+                                                producedRowField
+                                                        .get(
+                                                                producedRowFieldNames.indexOf(
+                                                                        fieldName))
+                                                        .getDataType()))
+                        .collect(Collectors.toList());
+
+        this.extendedRowIndexMapping =
+                EnrichedRowData.computeIndexMapping(
+                        producedRowFieldNames, mutableRowFieldNames, fixedRowFieldNames);
+        this.metadataColumnsFunctions =
+                metadataFieldNames.stream().map(metadataColumns::get).collect(Collectors.toList());
+    }
+
+    @Override
+    public Reader<RowData> createReader(Configuration config, FileSourceSplit split)
+            throws IOException {
+        return wrapReader(wrapped.createReader(config, split), split);
+    }
+
+    @Override
+    public Reader<RowData> restoreReader(Configuration config, FileSourceSplit split)
+            throws IOException {
+        return wrapReader(wrapped.restoreReader(config, split), split);
+    }
+
+    @Override
+    public boolean isSplittable() {
+        return wrapped.isSplittable();
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedType;
+    }
+
+    private Reader<RowData> wrapReader(Reader<RowData> superReader, FileSourceSplit split) {
+        // Fill the metadata + partition columns row
+        final GenericRowData fileInfoRowData =
+                new GenericRowData(metadataColumnsFunctions.size() + partitionColumnTypes.size());
+        int fileInfoRowIndex = 0;
+        for (; fileInfoRowIndex < metadataColumnsFunctions.size(); fileInfoRowIndex++) {
+            fileInfoRowData.setField(
+                    fileInfoRowIndex,
+                    metadataColumnsFunctions.get(fileInfoRowIndex).getValue(split));
+        }
+        if (!partitionColumnTypes.isEmpty()) {
+            final LinkedHashMap<String, String> partitionSpec =
+                    PartitionPathUtils.extractPartitionSpecFromPath(split.path());
+            for (int partitionFieldIndex = 0;
+                    fileInfoRowIndex < fileInfoRowData.getArity();
+                    fileInfoRowIndex++, partitionFieldIndex++) {
+                final String fieldName = partitionColumnTypes.get(partitionFieldIndex).getKey();
+                final DataType fieldType = partitionColumnTypes.get(partitionFieldIndex).getValue();
+                if (!partitionSpec.containsKey(fieldName)) {
+                    throw new RuntimeException(
+                            "Cannot find the partition value from path for partition: "
+                                    + fieldName);
+                }
+
+                String valueStr = partitionSpec.get(fieldName);
+                valueStr = valueStr.equals(defaultPartName) ? null : valueStr;
+                fileInfoRowData.setField(
+                        fileInfoRowIndex,
+                        PartitionPathUtils.convertStringToInternalValue(valueStr, fieldType));
+            }
+        }
+
+        // This row is going to be reused for every record
+        final EnrichedRowData producedRowData =
+                new EnrichedRowData(fileInfoRowData, this.extendedRowIndexMapping);
+
+        return new ReaderWrapper(superReader, producedRowData);
+    }
+
+    private static final class ReaderWrapper implements Reader<RowData> {
+
+        private final Reader<RowData> wrappedReader;
+        private final EnrichedRowData producedRowData;
+
+        private ReaderWrapper(Reader<RowData> wrappedReader, EnrichedRowData producedRowData) {
+            this.wrappedReader = wrappedReader;
+            this.producedRowData = producedRowData;
+        }
+
+        @Nullable
+        @Override
+        public RecordIterator<RowData> readBatch() throws IOException {
+            RecordIterator<RowData> iterator = wrappedReader.readBatch();
+            if (iterator == null) {
+                return null;
+            }
+            return new RecordMapperWrapperRecordIterator<>(
+                    iterator,
+                    physicalRowData -> {
+                        producedRowData.replaceMutableRow(physicalRowData);
+                        return producedRowData;
+                    });
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.wrappedReader.close();
+        }
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index 154e51e..2e9af35 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
@@ -281,12 +282,8 @@ public class FileSystemTableSink extends AbstractFileSystemTable
     }
 
     private Optional<CompactReader.Factory<RowData>> createCompactReaderFactory(Context context) {
-        if (bulkReaderFormat != null) {
-            final BulkFormat<RowData, FileSourceSplit> format =
-                    bulkReaderFormat.createRuntimeDecoder(
-                            createSourceContext(context), getPhysicalDataType());
-            return Optional.of(CompactBulkReader.factory(format));
-        } else if (formatFactory != null) {
+        // TODO FLINK-19845 old format factory, to be removed soon.
+        if (formatFactory != null) {
             final InputFormat<RowData, ?> format =
                     formatFactory.createReader(createReaderContext());
             if (format instanceof FileInputFormat) {
@@ -294,19 +291,36 @@ public class FileSystemTableSink extends AbstractFileSystemTable
                 return Optional.of(
                         FileInputFormatCompactReader.factory((FileInputFormat<RowData>) format));
             }
+            return Optional.empty();
+        }
+
+        // Compute producedDataType (including partition fields) and physicalDataType (excluding
+        // partition fields)
+        final DataType producedDataType = getPhysicalDataType();
+        final DataType physicalDataType =
+                DataType.getFields(producedDataType).stream()
+                        .filter(field -> !partitionKeys.contains(field.getName()))
+                        .collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
+
+        if (bulkReaderFormat != null) {
+            final BulkFormat<RowData, FileSourceSplit> format =
+                    new FileInfoExtractorBulkFormat(
+                            bulkReaderFormat.createRuntimeDecoder(
+                                    createSourceContext(context), physicalDataType),
+                            producedDataType,
+                            Collections.emptyMap(),
+                            partitionKeys,
+                            defaultPartName);
+            return Optional.of(CompactBulkReader.factory(format));
         } else if (deserializationFormat != null) {
-            // NOTE, we need pass full format types to deserializationFormat
             final DeserializationSchema<RowData> decoder =
                     deserializationFormat.createRuntimeDecoder(
-                            createSourceContext(context),
-                            getPhysicalDataTypeWithoutPartitionColumns());
-            final int[] projectedFields =
-                    IntStream.range(0, DataType.getFieldCount(getPhysicalDataType())).toArray();
-            DeserializationSchemaAdapter format =
-                    new DeserializationSchemaAdapter(
-                            decoder,
-                            getPhysicalDataType(),
-                            projectedFields,
+                            createSourceContext(context), physicalDataType);
+            final BulkFormat<RowData, FileSourceSplit> format =
+                    new FileInfoExtractorBulkFormat(
+                            new DeserializationSchemaAdapter(decoder),
+                            producedDataType,
+                            Collections.emptyMap(),
                             partitionKeys,
                             defaultPartName);
             return Optional.of(CompactBulkReader.factory(format));
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
index f187910..d1b12a7 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
@@ -42,7 +43,9 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
@@ -54,6 +57,7 @@ import org.apache.flink.table.utils.TableSchemaUtils;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -72,16 +76,20 @@ public class FileSystemTableSource extends AbstractFileSystemTable
                 SupportsProjectionPushDown,
                 SupportsLimitPushDown,
                 SupportsPartitionPushDown,
-                SupportsFilterPushDown {
+                SupportsFilterPushDown,
+                SupportsReadingMetadata {
 
     @Nullable private final DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat;
     @Nullable private final DecodingFormat<DeserializationSchema<RowData>> deserializationFormat;
     @Nullable private final FileSystemFormatFactory formatFactory;
 
-    private int[][] projectedFields;
+    // These mutable fields
     private List<Map<String, String>> remainingPartitions;
     private List<ResolvedExpression> filters;
     private Long limit;
+    private int[][] projectFields;
+    private List<String> metadataKeys;
+    private DataType producedDataType;
 
     public FileSystemTableSource(
             DynamicTableFactory.Context context,
@@ -101,54 +109,123 @@ public class FileSystemTableSource extends AbstractFileSystemTable
         this.bulkReaderFormat = bulkReaderFormat;
         this.deserializationFormat = deserializationFormat;
         this.formatFactory = formatFactory;
+
+        this.producedDataType = context.getPhysicalRowDataType();
     }
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        // When this table has no partition, just return a empty source.
         if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
-            // When this table has no partition, just return a empty source.
             return InputFormatProvider.of(new CollectionInputFormat<>(new ArrayList<>(), null));
-        } else if (bulkReaderFormat != null) {
-            if (bulkReaderFormat instanceof BulkDecodingFormat
-                    && filters != null
-                    && filters.size() > 0) {
-                ((BulkDecodingFormat<RowData>) bulkReaderFormat).applyFilters(filters);
+        }
+
+        // Physical type is computed from the full data type, filtering out partition and
+        // metadata columns. This type is going to be used by formats to parse the input.
+        List<DataTypes.Field> producedDataTypeFields = DataType.getFields(producedDataType);
+        if (metadataKeys != null && !metadataKeys.isEmpty()) {
+            // If metadata keys are present, then by SupportsReadingMetadata contract all the
+            // metadata columns will be at the end of the producedDataType, so we can just remove
+            // from the list the last metadataKeys.size() fields.
+            producedDataTypeFields =
+                    producedDataTypeFields.subList(
+                            0, producedDataTypeFields.size() - metadataKeys.size());
+        }
+        DataType physicalDataType =
+                producedDataTypeFields.stream()
+                        .filter(f -> partitionKeys == null || !partitionKeys.contains(f.getName()))
+                        .collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
+
+        // Resolve metadata and make sure to filter out metadata not in the producedDataType
+        List<String> metadataKeys =
+                (this.metadataKeys == null) ? Collections.emptyList() : this.metadataKeys;
+        metadataKeys =
+                DataType.getFieldNames(producedDataType).stream()
+                        .filter(metadataKeys::contains)
+                        .collect(Collectors.toList());
+        List<ReadableFileInfo> metadataToExtract =
+                metadataKeys.stream().map(ReadableFileInfo::resolve).collect(Collectors.toList());
+
+        // Filter out partition columns not in producedDataType
+        List<String> partitionKeysToExtract =
+                DataType.getFieldNames(producedDataType).stream()
+                        .filter(this.partitionKeys::contains)
+                        .collect(Collectors.toList());
+
+        // TODO FLINK-19845 old format factory, to be removed soon. The old factory doesn't support
+        //  metadata.
+        if (formatFactory != null) {
+            if (!metadataToExtract.isEmpty()) {
+                throw new IllegalStateException(
+                        "Metadata are not supported for format factories using FileSystemFormatFactory");
             }
-            BulkFormat<RowData, FileSourceSplit> bulkFormat =
-                    bulkReaderFormat.createRuntimeDecoder(scanContext, getProjectedDataType());
-            return createSourceProvider(bulkFormat);
-        } else if (formatFactory != null) {
+
             // The ContinuousFileMonitoringFunction can not accept multiple paths. Default
             // StreamEnv.createInput will create continuous function.
             // Avoid using ContinuousFileMonitoringFunction.
             return SourceFunctionProvider.of(
                     new InputFormatSourceFunction<>(
                             getInputFormat(),
-                            InternalTypeInfo.of(getProjectedDataType().getLogicalType())),
+                            InternalTypeInfo.of(producedDataType.getLogicalType())),
                     true);
+        }
+
+        if (bulkReaderFormat != null) {
+            if (bulkReaderFormat instanceof BulkDecodingFormat
+                    && filters != null
+                    && filters.size() > 0) {
+                ((BulkDecodingFormat<RowData>) bulkReaderFormat).applyFilters(filters);
+            }
+            BulkFormat<RowData, FileSourceSplit> bulkFormat =
+                    wrapBulkFormat(
+                            bulkReaderFormat.createRuntimeDecoder(scanContext, physicalDataType),
+                            producedDataType,
+                            metadataToExtract,
+                            partitionKeysToExtract);
+            return createSourceProvider(bulkFormat);
         } else if (deserializationFormat != null) {
-            // NOTE, we need pass full format types to deserializationFormat
             DeserializationSchema<RowData> decoder =
-                    deserializationFormat.createRuntimeDecoder(
-                            scanContext, getPhysicalDataTypeWithoutPartitionColumns());
-            return createSourceProvider(
-                    new DeserializationSchemaAdapter(
-                            decoder,
-                            getPhysicalDataType(),
-                            readFields(),
-                            partitionKeys,
-                            defaultPartName));
-            // return sourceProvider(wrapDeserializationFormat(deserializationFormat), scanContext);
+                    deserializationFormat.createRuntimeDecoder(scanContext, physicalDataType);
+            BulkFormat<RowData, FileSourceSplit> bulkFormat =
+                    wrapBulkFormat(
+                            new DeserializationSchemaAdapter(decoder),
+                            producedDataType,
+                            metadataToExtract,
+                            partitionKeysToExtract);
+            return createSourceProvider(bulkFormat);
         } else {
             throw new TableException("Can not find format factory.");
         }
     }
 
+    /**
+     * Wraps bulk format in a {@link FileInfoExtractorBulkFormat} and {@link LimitableBulkFormat},
+     * if needed.
+     */
+    private BulkFormat<RowData, FileSourceSplit> wrapBulkFormat(
+            BulkFormat<RowData, FileSourceSplit> bulkFormat,
+            DataType producedDataType,
+            List<ReadableFileInfo> metadata,
+            List<String> partitionKeys) {
+        if (!metadata.isEmpty() || !partitionKeys.isEmpty()) {
+            bulkFormat =
+                    new FileInfoExtractorBulkFormat(
+                            bulkFormat,
+                            producedDataType,
+                            metadata.stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    ReadableFileInfo::getKey,
+                                                    ReadableFileInfo::getAccessor)),
+                            partitionKeys,
+                            defaultPartName);
+        }
+        bulkFormat = LimitableBulkFormat.create(bulkFormat, limit);
+        return bulkFormat;
+    }
+
     private SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit> bulkFormat) {
-        FileSource.FileSourceBuilder<RowData> builder =
-                FileSource.forBulkFileFormat(
-                        LimitableBulkFormat.create(bulkFormat, limit), paths());
-        return SourceProvider.of(builder.build());
+        return SourceProvider.of(FileSource.forBulkFileFormat(bulkFormat, paths()).build());
     }
 
     private Path[] paths() {
@@ -195,7 +272,12 @@ public class FileSystemTableSource extends AbstractFileSystemTable
 
                     @Override
                     public int[] getProjectFields() {
-                        return readFields();
+                        return projectFields == null
+                                ? IntStream.range(0, DataType.getFieldCount(getPhysicalDataType()))
+                                        .toArray()
+                                : Arrays.stream(projectFields)
+                                        .mapToInt(array -> array[0])
+                                        .toArray();
                     }
 
                     @Override
@@ -271,19 +353,17 @@ public class FileSystemTableSource extends AbstractFileSystemTable
     }
 
     @Override
-    public void applyProjection(int[][] projectedFields, DataType producedDataType) {
-        this.projectedFields = projectedFields;
-    }
-
-    @Override
     public FileSystemTableSource copy() {
         FileSystemTableSource source =
                 new FileSystemTableSource(
                         context, bulkReaderFormat, deserializationFormat, formatFactory);
-        source.projectedFields = projectedFields;
+        source.partitionKeys = partitionKeys;
         source.remainingPartitions = remainingPartitions;
         source.filters = filters;
         source.limit = limit;
+        source.projectFields = projectFields;
+        source.metadataKeys = metadataKeys;
+        source.producedDataType = producedDataType;
         return source;
     }
 
@@ -314,20 +394,84 @@ public class FileSystemTableSource extends AbstractFileSystemTable
         return map;
     }
 
-    private int[] readFields() {
-        return projectedFields == null
-                ? IntStream.range(0, DataType.getFieldCount(getPhysicalDataType())).toArray()
-                : Arrays.stream(projectedFields).mapToInt(array -> array[0]).toArray();
+    // --------------------------------------------------------------------------------------------
+    // Methods to apply projections and metadata,
+    // will influence the final output and physical type used by formats
+    // --------------------------------------------------------------------------------------------
+
+    @Override
+    public void applyProjection(int[][] projectedFields, DataType producedDataType) {
+        this.projectFields = projectedFields;
+        this.producedDataType = producedDataType;
     }
 
-    private DataType getProjectedDataType() {
-        final DataType physicalDataType = super.getPhysicalDataType();
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+        this.metadataKeys = metadataKeys;
+        this.producedDataType = producedDataType;
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Metadata handling
+    // --------------------------------------------------------------------------------------------
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Arrays.stream(ReadableFileInfo.values())
+                .collect(Collectors.toMap(ReadableFileInfo::getKey, ReadableFileInfo::getDataType));
+    }
+
+    interface FileInfoAccessor extends Serializable {
+        /**
+         * Access the information from the {@link org.apache.flink.core.fs.FileInputSplit}. The
+         * return value type must be an internal type.
+         */
+        Object getValue(FileSourceSplit split);
+    }
+
+    enum ReadableFileInfo implements Serializable {
+        FILEPATH(
+                "filepath",
+                DataTypes.STRING().notNull(),
+                new FileInfoAccessor() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object getValue(FileSourceSplit split) {
+                        return StringData.fromString(split.path().getPath());
+                    }
+                });
+
+        final String key;
+        final DataType dataType;
+        final FileInfoAccessor converter;
+
+        ReadableFileInfo(String key, DataType dataType, FileInfoAccessor converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+
+        public String getKey() {
+            return key;
+        }
+
+        public DataType getDataType() {
+            return dataType;
+        }
+
+        public FileInfoAccessor getAccessor() {
+            return converter;
+        }
 
-        // If we haven't projected fields, we just return the original physical data type,
-        // otherwise we need to compute the physical data type depending on the projected fields.
-        if (projectedFields == null) {
-            return physicalDataType;
+        public static ReadableFileInfo resolve(String key) {
+            return Arrays.stream(ReadableFileInfo.values())
+                    .filter(readableFileInfo -> readableFileInfo.getKey().equals(key))
+                    .findFirst()
+                    .orElseThrow(
+                            () ->
+                                    new IllegalArgumentException(
+                                            "Cannot resolve the provided ReadableMetadata key"));
         }
-        return DataType.projectFields(physicalDataType, projectedFields);
     }
 }
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
index aae8e4c..dbec987 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
@@ -43,25 +43,25 @@ import java.util.List;
  */
 public class TestCsvDeserializationSchema implements DeserializationSchema<RowData> {
 
-    @SuppressWarnings("rawtypes")
-    private final DataStructureConverter[] csvRowToRowDataConverters;
+    private final List<DataType> physicalFieldTypes;
+    private final int physicalFieldCount;
 
     private final TypeInformation<RowData> typeInfo;
-    private final int fieldCount;
-    private final List<DataType> fieldTypes;
+    private final int[] indexMapping;
 
-    private transient FieldParser<?>[] fieldParsers;
+    @SuppressWarnings("rawtypes")
+    private transient DataStructureConverter[] csvRowToRowDataConverters;
 
-    public TestCsvDeserializationSchema(DataType dataType) {
-        this.fieldTypes = DataType.getFieldDataTypes(dataType);
-        this.fieldCount = fieldTypes.size();
+    private transient FieldParser<?>[] fieldParsers;
 
-        this.csvRowToRowDataConverters =
-                fieldTypes.stream()
-                        .map(DataStructureConverters::getConverter)
-                        .toArray(DataStructureConverter[]::new);
+    public TestCsvDeserializationSchema(DataType physicalDataType, List<String> orderedCsvColumns) {
+        this.physicalFieldTypes = DataType.getFieldDataTypes(physicalDataType);
+        this.physicalFieldCount = physicalFieldTypes.size();
+        this.typeInfo = InternalTypeInfo.of((RowType) physicalDataType.getLogicalType());
 
-        this.typeInfo = InternalTypeInfo.of((RowType) dataType.getLogicalType());
+        List<String> physicalFieldNames = DataType.getFieldNames(physicalDataType);
+        this.indexMapping =
+                orderedCsvColumns.stream().mapToInt(physicalFieldNames::indexOf).toArray();
 
         initFieldParsers();
     }
@@ -74,14 +74,18 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa
     @SuppressWarnings("unchecked")
     @Override
     public RowData deserialize(byte[] message) throws IOException {
-        GenericRowData row = new GenericRowData(fieldCount);
+        GenericRowData row = new GenericRowData(physicalFieldCount);
         int startIndex = 0;
-        for (int i = 0; i < fieldCount; i++) {
+        for (int csvColumn = 0; csvColumn < indexMapping.length; csvColumn++) {
             startIndex =
-                    this.fieldParsers[i].resetErrorStateAndParse(
+                    fieldParsers[csvColumn].resetErrorStateAndParse(
                             message, startIndex, message.length, new byte[] {','}, null);
-            row.setField(
-                    i, csvRowToRowDataConverters[i].toInternal(fieldParsers[i].getLastResult()));
+            if (indexMapping[csvColumn] != -1) {
+                row.setField(
+                        indexMapping[csvColumn],
+                        csvRowToRowDataConverters[csvColumn].toInternal(
+                                fieldParsers[csvColumn].getLastResult()));
+            }
         }
         return row;
     }
@@ -97,9 +101,19 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa
     }
 
     private void initFieldParsers() {
-        this.fieldParsers = new FieldParser<?>[fieldCount];
-        for (int i = 0; i < fieldTypes.size(); i++) {
-            DataType fieldType = fieldTypes.get(i);
+        int csvRowLength = indexMapping.length;
+        this.fieldParsers = new FieldParser<?>[csvRowLength];
+        this.csvRowToRowDataConverters = new DataStructureConverter[csvRowLength];
+        for (int csvColumn = 0; csvColumn < csvRowLength; csvColumn++) {
+            if (indexMapping[csvColumn] == -1) {
+                // The output type doesn't include this field, so just assign a string parser to
+                // skip it
+                this.fieldParsers[csvColumn] =
+                        InstantiationUtil.instantiate(
+                                FieldParser.getParserForType(String.class), FieldParser.class);
+                continue;
+            }
+            DataType fieldType = physicalFieldTypes.get(indexMapping[csvColumn]);
             Class<? extends FieldParser<?>> parserType =
                     FieldParser.getParserForType(
                             logicalTypeRootToFieldParserClass(
@@ -110,7 +124,9 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa
 
             FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class);
 
-            this.fieldParsers[i] = p;
+            this.fieldParsers[csvColumn] = p;
+            this.csvRowToRowDataConverters[csvColumn] =
+                    DataStructureConverters.getConverter(fieldType);
         }
     }
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
index 5595b87..03a80e7 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
@@ -43,7 +43,9 @@ import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER;
 import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_LINE_DELIMITER;
@@ -118,11 +120,24 @@ public class TestCsvFileSystemFormatFactory
     @Override
     public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
             DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+        List<String> schemaFields =
+                DataType.getFieldNames(context.getPhysicalRowDataType()).stream()
+                        .filter(
+                                field ->
+                                        !context.getCatalogTable()
+                                                .getPartitionKeys()
+                                                .contains(field))
+                        .collect(Collectors.toList());
         return new DecodingFormat<DeserializationSchema<RowData>>() {
             @Override
             public DeserializationSchema<RowData> createRuntimeDecoder(
                     DynamicTableSource.Context context, DataType physicalDataType) {
-                return new TestCsvDeserializationSchema(physicalDataType);
+                // TestCsvDeserializationSchema has no knowledge of the field names, and the
+                // implicit assumption done by tests is that the csv rows are composed by only the
+                // physical fields (excluding partition fields) in the same order as defined in the
+                // table declaration. This is why TestCsvDeserializationSchema needs
+                // schemaFields.
+                return new TestCsvDeserializationSchema(physicalDataType, schemaFields);
             }
 
             @Override