You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/22 09:43:18 UTC

[GitHub] [flink] twalthr commented on a change in pull request #17544: [FLINK-24165][table] Add infrastructure to support metadata for filesystem connector

twalthr commented on a change in pull request #17544:
URL: https://github.com/apache/flink/pull/17544#discussion_r734367759



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ExtendedRowData.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * An implementation of {@link RowData} which is backed by two {@link RowData} with a well-defined
+ * index mapping, One of the rows is fixed, while the other can be swapped for performant changes in
+ * hot code paths. The {@link RowKind} is inherited from the mutable row.
+ */
+@PublicEvolving
+public class ExtendedRowData implements RowData {

Review comment:
       maybe `EnrichedRowData`? When I read "extended" it sounds like we add fields at the end.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ExtendedRowData.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * An implementation of {@link RowData} which is backed by two {@link RowData} with a well-defined
+ * index mapping, One of the rows is fixed, while the other can be swapped for performant changes in
+ * hot code paths. The {@link RowKind} is inherited from the mutable row.
+ */
+@PublicEvolving
+public class ExtendedRowData implements RowData {
+
+    private final RowData fixedRow;
+    // The index mapping is built as follows: positive indexes are indexes refer to mutable row
+    // positions,
+    // while negative indexes (with -1 offset) refer to fixed row positions.
+    // For example an index mapping [0, 1, -1, -2, 2] means:
+    // * Index 0 -> mutable row index 0
+    // * Index 1 -> mutable row index 1
+    // * Index -1 -> fixed row index 0
+    // * Index -2 -> fixed row index 1
+    // * Index 2 -> mutable row index 2
+    private final int[] indexMapping;
+
+    private RowData mutableRow;
+
+    public ExtendedRowData(RowData fixedRow, int[] indexMapping) {
+        this.fixedRow = fixedRow;
+        this.indexMapping = indexMapping;
+    }
+
+    /**
+     * Replaces the mutable {@link RowData} backing this {@link ExtendedRowData}.
+     *
+     * <p>This method replaces the mutable row data in place and does not return a new object. This
+     * is done for performance reasons.
+     */
+    public ExtendedRowData replaceMutableRow(RowData mutableRow) {
+        this.mutableRow = mutableRow;
+        return this;
+    }
+
+    // ---------------------------------------------------------------------------------------------
+
+    @Override
+    public int getArity() {
+        return indexMapping.length;
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return this.mutableRow.getRowKind();

Review comment:
       nit: `this.` can be removed

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
##########
@@ -56,6 +56,28 @@ abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSys
       sink.getAppendResults.sorted)
   }
 
+  override def checkPredicate(sqlQuery: String, checkFunc: Row => Unit): Unit = {
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]

Review comment:
       `toAppendStream` is deprecated use `toDataStream` instead

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -330,4 +386,89 @@ private DataType getProjectedDataType() {
         }
         return DataType.projectFields(physicalDataType, projectedFields);
     }
+
+    @Override
+    DataType getPhysicalDataTypeWithoutPartitionColumns() {
+        if (this.producedDataType != null) {
+            return DataTypes.ROW(
+                    DataType.getFields(this.producedDataType).stream()
+                            .filter(field -> !usedMetadataKeys.contains(field.getName()))
+                            .filter(field -> !partitionKeys.contains(field.getName()))
+                            .toArray(DataTypes.Field[]::new));
+        }
+        return super.getPhysicalDataTypeWithoutPartitionColumns();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Metadata handling
+    // --------------------------------------------------------------------------------------------
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Arrays.stream(ReadableFileInfo.values())
+                .collect(Collectors.toMap(ReadableFileInfo::getKey, ReadableFileInfo::getDataType));
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+        if (metadataKeys.isEmpty()) {
+            // This method should be idempotent
+            this.usedMetadataKeys = null;
+            this.usedMetadata = null;
+            this.producedDataType = null;
+            return;
+        }
+
+        this.usedMetadataKeys = metadataKeys;
+        this.usedMetadata =
+                metadataKeys.stream().map(ReadableFileInfo::resolve).collect(Collectors.toList());
+        this.producedDataType = producedDataType;
+    }
+
+    interface FileInfoAccessor extends Serializable {
+        /**
+         * Access the information from the {@link org.apache.flink.core.fs.FileInputSplit}. The
+         * return value type must be an internal type.
+         */
+        Object getValue(FileSourceSplit split);
+    }
+
+    enum ReadableFileInfo implements Serializable {
+        FILENAME(
+                "filename",

Review comment:
       +1 for filepath

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ExtendedRowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * This {@link BulkFormat} is a wrapper that attaches file information columns to the output
+ * records.
+ */
+class FileInfoExtractorBulkFormat implements BulkFormat<RowData, FileSourceSplit> {
+
+    private final BulkFormat<RowData, FileSourceSplit> wrapped;
+    private final TypeInformation<RowData> producedType;
+
+    private final List<FileSystemTableSource.FileInfoAccessor> metadataColumnsFunctions;
+    private final int[] extendedRowIndexMapping;
+
+    public FileInfoExtractorBulkFormat(
+            BulkFormat<RowData, FileSourceSplit> wrapped,
+            DataType fullDataType,
+            Map<String, FileSystemTableSource.FileInfoAccessor> metadataColumns) {
+        this.wrapped = wrapped;
+        this.producedType = InternalTypeInfo.of(fullDataType.getLogicalType());
+
+        // Compute index mapping for the extended row and the functions to compute metadata
+        List<String> completeRowFields = DataType.getFieldNames(fullDataType);
+        List<String> mutableRowFields =
+                completeRowFields.stream()
+                        .filter(key -> !metadataColumns.containsKey(key))
+                        .collect(Collectors.toList());
+        List<String> fixedRowFields = new ArrayList<>(metadataColumns.keySet());
+        this.extendedRowIndexMapping =
+                ExtendedRowData.computeIndexMapping(
+                        completeRowFields, mutableRowFields, fixedRowFields);
+        this.metadataColumnsFunctions =
+                fixedRowFields.stream().map(metadataColumns::get).collect(Collectors.toList());
+    }
+
+    @Override
+    public Reader<RowData> createReader(Configuration config, FileSourceSplit split)
+            throws IOException {
+        return wrapReader(wrapped.createReader(config, split), split);
+    }
+
+    @Override
+    public Reader<RowData> restoreReader(Configuration config, FileSourceSplit split)
+            throws IOException {
+        return wrapReader(wrapped.restoreReader(config, split), split);
+    }
+
+    @Override
+    public boolean isSplittable() {
+        return wrapped.isSplittable();
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedType;
+    }
+
+    private Reader<RowData> wrapReader(Reader<RowData> superReader, FileSourceSplit split) {
+        // Fill the metadata row
+        final GenericRowData metadataRowData = new GenericRowData(metadataColumnsFunctions.size());
+        for (int i = 0; i < metadataColumnsFunctions.size(); i++) {
+            metadataRowData.setField(i, metadataColumnsFunctions.get(i).getValue(split));
+        }
+
+        // This row is going to be reused for every record
+        final ExtendedRowData extendedRowData =
+                new ExtendedRowData(metadataRowData, this.extendedRowIndexMapping);
+
+        return new ReaderWrapper(
+                superReader,
+                physicalRowData -> {

Review comment:
       Let's move this into ReaderWrapper directly. The reusable record should be visible as a member variable of `ReaderWrapper` instead of implicitly accessing a local variable in the lambda.

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -315,13 +340,44 @@ public String asSummaryString() {
     }
 
     private int[] readFields() {
+        if (this.producedDataType != null) {
+            return IntStream.range(
+                            0,
+                            (int)
+                                    DataType.getFields(this.producedDataType).stream()
+                                            .filter(
+                                                    field ->
+                                                            !usedMetadataKeys.contains(
+                                                                    field.getName()))
+                                            .count())
+                    .toArray();
+        }
         return projectedFields == null
                 ? IntStream.range(0, DataType.getFieldCount(getPhysicalDataType())).toArray()
                 : Arrays.stream(projectedFields).mapToInt(array -> array[0]).toArray();
     }
 
+    @Override
+    DataType getPhysicalDataType() {
+        if (this.usedMetadataKeys != null) {
+            return DataTypes.ROW(
+                    DataType.getFields(super.getPhysicalDataType()).stream()
+                            .filter(field -> !usedMetadataKeys.contains(field.getName()))
+                            .toArray(DataTypes.Field[]::new));
+        }
+        return super.getPhysicalDataType();
+    }
+
     private DataType getProjectedDataType() {
-        final DataType physicalDataType = super.getPhysicalDataType();
+        final DataType physicalDataType =
+                this.producedDataType != null

Review comment:
       can't we just store `physicalDataType` in a separate variable instead of recomputing it multiple times? both `physialDataType` and `producedDataType` should be member variables. Please take `KafkaDynamicSource` as a reference implementation, also for structuring the member variables.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ExtendedRowData.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * An implementation of {@link RowData} which is backed by two {@link RowData} with a well-defined
+ * index mapping, One of the rows is fixed, while the other can be swapped for performant changes in
+ * hot code paths. The {@link RowKind} is inherited from the mutable row.
+ */
+@PublicEvolving
+public class ExtendedRowData implements RowData {
+
+    private final RowData fixedRow;
+    // The index mapping is built as follows: positive indexes are indexes refer to mutable row
+    // positions,
+    // while negative indexes (with -1 offset) refer to fixed row positions.
+    // For example an index mapping [0, 1, -1, -2, 2] means:
+    // * Index 0 -> mutable row index 0
+    // * Index 1 -> mutable row index 1
+    // * Index -1 -> fixed row index 0
+    // * Index -2 -> fixed row index 1
+    // * Index 2 -> mutable row index 2
+    private final int[] indexMapping;

Review comment:
       I feel this can be quite difficult to use due to `-1 -> fixed row index 0` and `Index 0 -> mutable row index 0`. How about we introduce a helper POJO `RowMapping<RowType.LEFT, 2>` and `RowMapping<RowType.RIGHT, 3>`. Or we simply make both indexes 1 based and skip the 0.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ExtendedRowData.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * An implementation of {@link RowData} which is backed by two {@link RowData} with a well-defined
+ * index mapping, One of the rows is fixed, while the other can be swapped for performant changes in
+ * hot code paths. The {@link RowKind} is inherited from the mutable row.
+ */
+@PublicEvolving
+public class ExtendedRowData implements RowData {
+
+    private final RowData fixedRow;
+    // The index mapping is built as follows: positive indexes are indexes refer to mutable row
+    // positions,
+    // while negative indexes (with -1 offset) refer to fixed row positions.
+    // For example an index mapping [0, 1, -1, -2, 2] means:
+    // * Index 0 -> mutable row index 0
+    // * Index 1 -> mutable row index 1
+    // * Index -1 -> fixed row index 0
+    // * Index -2 -> fixed row index 1
+    // * Index 2 -> mutable row index 2
+    private final int[] indexMapping;
+
+    private RowData mutableRow;
+
+    public ExtendedRowData(RowData fixedRow, int[] indexMapping) {
+        this.fixedRow = fixedRow;
+        this.indexMapping = indexMapping;
+    }
+
+    /**
+     * Replaces the mutable {@link RowData} backing this {@link ExtendedRowData}.
+     *
+     * <p>This method replaces the mutable row data in place and does not return a new object. This
+     * is done for performance reasons.
+     */
+    public ExtendedRowData replaceMutableRow(RowData mutableRow) {
+        this.mutableRow = mutableRow;
+        return this;
+    }
+
+    // ---------------------------------------------------------------------------------------------
+
+    @Override
+    public int getArity() {
+        return indexMapping.length;
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return this.mutableRow.getRowKind();
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        this.mutableRow.setRowKind(kind);
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.isNullAt(index);
+        } else {
+            return fixedRow.isNullAt(-(index + 1));
+        }
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getBoolean(index);
+        } else {
+            return fixedRow.getBoolean(-(index + 1));
+        }
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getByte(index);
+        } else {
+            return fixedRow.getByte(-(index + 1));
+        }
+    }
+
+    @Override
+    public short getShort(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getShort(index);
+        } else {
+            return fixedRow.getShort(-(index + 1));
+        }
+    }
+
+    @Override
+    public int getInt(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getInt(index);
+        } else {
+            return fixedRow.getInt(-(index + 1));
+        }
+    }
+
+    @Override
+    public long getLong(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getLong(index);
+        } else {
+            return fixedRow.getLong(-(index + 1));
+        }
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getFloat(index);
+        } else {
+            return fixedRow.getFloat(-(index + 1));
+        }
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getDouble(index);
+        } else {
+            return fixedRow.getDouble(-(index + 1));
+        }
+    }
+
+    @Override
+    public StringData getString(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getString(index);
+        } else {
+            return fixedRow.getString(-(index + 1));
+        }
+    }
+
+    @Override
+    public DecimalData getDecimal(int pos, int precision, int scale) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getDecimal(index, precision, scale);
+        } else {
+            return fixedRow.getDecimal(-(index + 1), precision, scale);
+        }
+    }
+
+    @Override
+    public TimestampData getTimestamp(int pos, int precision) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getTimestamp(index, precision);
+        } else {
+            return fixedRow.getTimestamp(-(index + 1), precision);
+        }
+    }
+
+    @Override
+    public <T> RawValueData<T> getRawValue(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getRawValue(index);
+        } else {
+            return fixedRow.getRawValue(-(index + 1));
+        }
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getBinary(index);
+        } else {
+            return fixedRow.getBinary(-(index + 1));
+        }
+    }
+
+    @Override
+    public ArrayData getArray(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getArray(index);
+        } else {
+            return fixedRow.getArray(-(index + 1));
+        }
+    }
+
+    @Override
+    public MapData getMap(int pos) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getMap(index);
+        } else {
+            return fixedRow.getMap(-(index + 1));
+        }
+    }
+
+    @Override
+    public RowData getRow(int pos, int numFields) {
+        int index = indexMapping[pos];
+        if (index >= 0) {
+            return mutableRow.getRow(index, numFields);
+        } else {
+            return fixedRow.getRow(-(index + 1), numFields);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ExtendedRowData that = (ExtendedRowData) o;
+        return Objects.equals(this.fixedRow, that.fixedRow)
+                && Objects.equals(this.mutableRow, that.mutableRow);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(fixedRow, mutableRow);
+    }
+
+    @Override
+    public String toString() {
+        return mutableRow.getRowKind().shortString()
+                + "{"
+                + "fixedRow="
+                + fixedRow
+                + ", mutableRow="
+                + mutableRow
+                + '}';
+    }
+
+    /**
+     * Creates a new {@link ExtendedRowData} with the provided {@code fixedRow} as the immutable
+     * static row, and uses the {@code completeRowFields}, {@code fixedRowFields} and {@code
+     * mutableRowFields} arguments to compute the indexes mapping.
+     */
+    public static ExtendedRowData from(
+            RowData fixedRow,
+            List<String> completeRowFields,

Review comment:
       can you add more docs for the paramters or an example? at first glance it is might not be obvious what `completeRowFields` means.

##########
File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/ExtendedRowDataTest.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link JoinedRowData}. */
+public class ExtendedRowDataTest {
+
+    @Test
+    public void testJoinedRows() {
+        List<String> completeRowFields =

Review comment:
       nit: can also be final

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -330,4 +386,89 @@ private DataType getProjectedDataType() {
         }
         return DataType.projectFields(physicalDataType, projectedFields);
     }
+
+    @Override
+    DataType getPhysicalDataTypeWithoutPartitionColumns() {
+        if (this.producedDataType != null) {
+            return DataTypes.ROW(
+                    DataType.getFields(this.producedDataType).stream()
+                            .filter(field -> !usedMetadataKeys.contains(field.getName()))
+                            .filter(field -> !partitionKeys.contains(field.getName()))
+                            .toArray(DataTypes.Field[]::new));
+        }
+        return super.getPhysicalDataTypeWithoutPartitionColumns();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Metadata handling
+    // --------------------------------------------------------------------------------------------
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Arrays.stream(ReadableFileInfo.values())
+                .collect(Collectors.toMap(ReadableFileInfo::getKey, ReadableFileInfo::getDataType));
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+        if (metadataKeys.isEmpty()) {
+            // This method should be idempotent
+            this.usedMetadataKeys = null;
+            this.usedMetadata = null;
+            this.producedDataType = null;
+            return;
+        }
+
+        this.usedMetadataKeys = metadataKeys;
+        this.usedMetadata =
+                metadataKeys.stream().map(ReadableFileInfo::resolve).collect(Collectors.toList());
+        this.producedDataType = producedDataType;
+    }
+
+    interface FileInfoAccessor extends Serializable {
+        /**
+         * Access the information from the {@link org.apache.flink.core.fs.FileInputSplit}. The
+         * return value type must be an internal type.
+         */
+        Object getValue(FileSourceSplit split);
+    }
+
+    enum ReadableFileInfo implements Serializable {
+        FILENAME(
+                "filename",
+                DataTypes.STRING().notNull(),
+                split -> StringData.fromString(split.path().getPath()));

Review comment:
       use an anonymous inner class with a serialVersionID (see Kafka source). we had issue in the past because lambdas are not proper serializable when Maven shading is involved.

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ExtendedRowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * This {@link BulkFormat} is a wrapper that attaches file information columns to the output
+ * records.
+ */
+class FileInfoExtractorBulkFormat implements BulkFormat<RowData, FileSourceSplit> {
+
+    private final BulkFormat<RowData, FileSourceSplit> wrapped;
+    private final TypeInformation<RowData> producedType;
+
+    private final List<FileSystemTableSource.FileInfoAccessor> metadataColumnsFunctions;
+    private final int[] extendedRowIndexMapping;
+
+    public FileInfoExtractorBulkFormat(
+            BulkFormat<RowData, FileSourceSplit> wrapped,
+            DataType fullDataType,
+            Map<String, FileSystemTableSource.FileInfoAccessor> metadataColumns) {
+        this.wrapped = wrapped;
+        this.producedType = InternalTypeInfo.of(fullDataType.getLogicalType());

Review comment:
       at some point we should move this into a connector module, it doesn't belong into `table-runtime`. Is it easily possible to let the type information be created before and just passed into this class?

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -315,13 +340,44 @@ public String asSummaryString() {
     }
 
     private int[] readFields() {
+        if (this.producedDataType != null) {
+            return IntStream.range(
+                            0,
+                            (int)
+                                    DataType.getFields(this.producedDataType).stream()
+                                            .filter(
+                                                    field ->
+                                                            !usedMetadataKeys.contains(
+                                                                    field.getName()))
+                                            .count())
+                    .toArray();
+        }
         return projectedFields == null
                 ? IntStream.range(0, DataType.getFieldCount(getPhysicalDataType())).toArray()
                 : Arrays.stream(projectedFields).mapToInt(array -> array[0]).toArray();
     }
 
+    @Override
+    DataType getPhysicalDataType() {
+        if (this.usedMetadataKeys != null) {
+            return DataTypes.ROW(
+                    DataType.getFields(super.getPhysicalDataType()).stream()
+                            .filter(field -> !usedMetadataKeys.contains(field.getName()))
+                            .toArray(DataTypes.Field[]::new));
+        }
+        return super.getPhysicalDataType();
+    }
+
     private DataType getProjectedDataType() {
-        final DataType physicalDataType = super.getPhysicalDataType();
+        final DataType physicalDataType =
+                this.producedDataType != null
+                        ? DataTypes.ROW(
+                                DataType.getFields(this.producedDataType).stream()
+                                        .filter(
+                                                field ->
+                                                        !usedMetadataKeys.contains(field.getName()))
+                                        .toArray(DataTypes.Field[]::new))
+                        : super.getPhysicalDataType();

Review comment:
       What do you mean with `physical columns + partition columns`? Partition columns are always part of the physical columns, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org