You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/22 07:21:54 UTC

[GitHub] [flink] slinkydeveloper commented on a change in pull request #17544: [FLINK-24165][table] Add infrastructure to support metadata for filesystem connector

slinkydeveloper commented on a change in pull request #17544:
URL: https://github.com/apache/flink/pull/17544#discussion_r734292104



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -330,4 +386,89 @@ private DataType getProjectedDataType() {
         }
         return DataType.projectFields(physicalDataType, projectedFields);
     }
+
+    @Override
+    DataType getPhysicalDataTypeWithoutPartitionColumns() {
+        if (this.producedDataType != null) {
+            return DataTypes.ROW(
+                    DataType.getFields(this.producedDataType).stream()
+                            .filter(field -> !usedMetadataKeys.contains(field.getName()))
+                            .filter(field -> !partitionKeys.contains(field.getName()))
+                            .toArray(DataTypes.Field[]::new));
+        }
+        return super.getPhysicalDataTypeWithoutPartitionColumns();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Metadata handling
+    // --------------------------------------------------------------------------------------------
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Arrays.stream(ReadableFileInfo.values())
+                .collect(Collectors.toMap(ReadableFileInfo::getKey, ReadableFileInfo::getDataType));
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+        if (metadataKeys.isEmpty()) {
+            // This method should be idempotent
+            this.usedMetadataKeys = null;
+            this.usedMetadata = null;
+            this.producedDataType = null;
+            return;
+        }
+
+        this.usedMetadataKeys = metadataKeys;
+        this.usedMetadata =
+                metadataKeys.stream().map(ReadableFileInfo::resolve).collect(Collectors.toList());
+        this.producedDataType = producedDataType;
+    }
+
+    interface FileInfoAccessor extends Serializable {
+        /**
+         * Access the information from the {@link org.apache.flink.core.fs.FileInputSplit}. The
+         * return value type must be an internal type.
+         */
+        Object getValue(FileSourceSplit split);
+    }
+
+    enum ReadableFileInfo implements Serializable {
+        FILENAME(
+                "filename",

Review comment:
       Perhaps should we name this `filepath` or just `path`? From `filename` I expect the filename including the file extension, but excluding the whole path.

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -315,13 +340,44 @@ public String asSummaryString() {
     }
 
     private int[] readFields() {
+        if (this.producedDataType != null) {
+            return IntStream.range(
+                            0,
+                            (int)
+                                    DataType.getFields(this.producedDataType).stream()
+                                            .filter(
+                                                    field ->
+                                                            !usedMetadataKeys.contains(
+                                                                    field.getName()))
+                                            .count())
+                    .toArray();
+        }
         return projectedFields == null
                 ? IntStream.range(0, DataType.getFieldCount(getPhysicalDataType())).toArray()
                 : Arrays.stream(projectedFields).mapToInt(array -> array[0]).toArray();
     }
 
+    @Override
+    DataType getPhysicalDataType() {
+        if (this.usedMetadataKeys != null) {
+            return DataTypes.ROW(
+                    DataType.getFields(super.getPhysicalDataType()).stream()
+                            .filter(field -> !usedMetadataKeys.contains(field.getName()))
+                            .toArray(DataTypes.Field[]::new));
+        }
+        return super.getPhysicalDataType();
+    }
+
     private DataType getProjectedDataType() {
-        final DataType physicalDataType = super.getPhysicalDataType();
+        final DataType physicalDataType =
+                this.producedDataType != null
+                        ? DataTypes.ROW(
+                                DataType.getFields(this.producedDataType).stream()
+                                        .filter(
+                                                field ->
+                                                        !usedMetadataKeys.contains(field.getName()))
+                                        .toArray(DataTypes.Field[]::new))
+                        : super.getPhysicalDataType();

Review comment:
       All these changes are unfortunately necessary to support all the different combinations of the types our formats expect now: only physical columns, physical columns + partition columns, projected columns + physical columns, then all the combinations with metadata as well etc... All this code is hopefully going to disappear with https://issues.apache.org/jira/browse/FLINK-24617




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org