You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "luoyuxia (via GitHub)" <gi...@apache.org> on 2023/03/06 08:53:53 UTC

[GitHub] [flink] luoyuxia commented on a diff in pull request #21290: [FLINK-29980] Handle partition keys directly in hive bulk format

luoyuxia commented on code in PR #21290:
URL: https://github.com/apache/flink/pull/21290#discussion_r1125885836


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileInfoExtractorBulkFormat.java:
##########
@@ -23,95 +23,53 @@
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.utils.PartitionPathUtils;
 
 import java.io.IOException;
-import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * This {@link BulkFormat} is a wrapper that attaches file information columns to the output
  * records.
  */
-class FileInfoExtractorBulkFormat implements BulkFormat<RowData, FileSourceSplit> {
+public class FileInfoExtractorBulkFormat<SplitT extends FileSourceSplit>
+        implements BulkFormat<RowData, SplitT> {
 
-    private final BulkFormat<RowData, FileSourceSplit> wrapped;
-    private final TypeInformation<RowData> producedType;
-
-    private final List<FileSystemTableSource.FileInfoAccessor> metadataColumnsFunctions;
-    private final List<Map.Entry<String, DataType>> partitionColumnTypes;
-    private final int[] extendedRowIndexMapping;
-
-    private final String defaultPartName;
+    private final BulkFormat<RowData, SplitT> wrapped;
+    private final PartitionFieldExtractor<SplitT> partitionFieldExtractor;
+    private final FileInfoExtractor fileInfoExtractor;
+    public final List<FileSystemTableSource.FileInfoAccessor> metadataColumnsFunctions;
+    public final TypeInformation<RowData> producedType;
 
     public FileInfoExtractorBulkFormat(
-            BulkFormat<RowData, FileSourceSplit> wrapped,
+            BulkFormat<RowData, SplitT> wrapped,
             DataType producedDataType,
             TypeInformation<RowData> producedTypeInformation,
             Map<String, FileSystemTableSource.FileInfoAccessor> metadataColumns,
             List<String> partitionColumns,
-            String defaultPartName) {
+            PartitionFieldExtractor<SplitT> partitionFieldExtractor) {

Review Comment:
   Why change this? It seems like a code refactor?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileInfoExtractorBulkFormat.java:
##########
@@ -125,41 +83,38 @@ public TypeInformation<RowData> getProducedType() {
         return producedType;
     }
 
-    private Reader<RowData> wrapReader(Reader<RowData> superReader, FileSourceSplit split) {
+    private Reader<RowData> wrapReader(Reader<RowData> superReader, SplitT split) {
         // Fill the metadata + partition columns row
+        List<FileInfoExtractor.PartitionColumn> partitionColumns =

Review Comment:
   Seems like another code refactor? Do we really need refactor it? Is there any special reason?



##########
flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java:
##########
@@ -58,14 +56,10 @@ OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
                     Configuration hadoopConfig,
                     RowType tableType,
                     List<String> partitionKeys,

Review Comment:
   Then, as todo said, `partitionKeys` code should be pruned.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HivePartitionFieldExtractor.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.connector.file.table.PartitionFieldExtractor;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.connectors.hive.read.HiveTableInputSplit;
+import org.apache.flink.connectors.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+
+/** Partition extractor to extract partition value from {@link HiveSourceSplit}. */
+public class HivePartitionFieldExtractor {
+
+    public static PartitionFieldExtractor<HiveSourceSplit> createForHiveSourceSplit(
+            HiveShim hiveShim, String defaultPartitionName) {
+        return (split, fieldName, fieldType) -> {
+            String valueString = split.getHiveTablePartition().getPartitionSpec().get(fieldName);
+            return HivePartitionUtils.restorePartitionValueFromType(
+                    hiveShim, valueString, fieldType, defaultPartitionName);
+        };
+    }
+
+    public static PartitionFieldExtractor<HiveTableInputSplit> createForHiveTableInputSplit(

Review Comment:
   Do we really need it as we already have `createForHiveSourceSplit`?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java:
##########
@@ -79,9 +76,8 @@ public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
     private final DataType[] fieldTypes;
     private final String hiveVersion;
     private final HiveShim hiveShim;
-    private final RowType producedRowType;
+    private final RowType physicalRowType;

Review Comment:
   Why name it to `physicalRowType`? IIUC, it mean non-partition colums, but non-partition colum are still physical column



##########
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java:
##########
@@ -157,14 +154,12 @@ void testReadFileWithSelectFields() throws IOException {
         partSpec.put("f1", 1);
         partSpec.put("f3", 3L);
         partSpec.put("f5", "f5");
-        partSpec.put("f8", BigDecimal.valueOf(5.333));

Review Comment:
   Why remove/change them?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapper.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+/** Record mapper definition. */
+@FunctionalInterface
+public interface RecordMapper<I, O> {

Review Comment:
   Why move `RecordMapper` to here? I think it's fine to keep it in origin place.



##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -294,56 +286,6 @@ void testProjectionReadUnknownField(int rowGroupSize) throws IOException {
                 });
     }
 
-    @ParameterizedTest
-    @MethodSource("parameters")
-    void testPartitionValues(int rowGroupSize) throws IOException {

Review Comment:
   After remove the invalid test in  parquet/orc input format test, I think we should add test for `FileInfoExtractorBulkFormat` to make sure it can get partition columns correctly.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java:
##########
@@ -111,7 +114,9 @@ public HiveSourceBuilder(
             @Nullable String hiveVersion,
             @Nonnull String dbName,
             @Nonnull String tableName,
-            @Nonnull Map<String, String> tableOptions) {
+            @Nonnull Map<String, String> tableOptions,
+            @Nonnull TypeInformation<RowData> producedTypeInfo,

Review Comment:
   Do we really need to add these two arguements in the constuctor?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java:
##########
@@ -143,21 +139,6 @@ public HiveMapredSplitReader(
 
         // construct reuse row
         this.row = new GenericRowData(selectedFields.length);

Review Comment:
   `selectedFields` will contains partition column, we should exclude partition column as we handle in extern wrapper.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java:
##########
@@ -79,9 +76,8 @@ public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
     private final DataType[] fieldTypes;
     private final String hiveVersion;
     private final HiveShim hiveShim;
-    private final RowType producedRowType;
+    private final RowType physicalRowType;

Review Comment:
   Also, why not only pass non-partition columns to `HiveInputFormat`? So that we don't need  to change much the code for this class.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java:
##########
@@ -334,37 +336,27 @@ private boolean isStreamingSource() {
                         STREAMING_SOURCE_ENABLE.defaultValue().toString()));
     }
 
-    private RowType getProducedRowType() {
-        TableSchema producedSchema;
-        if (projectedFields == null) {
-            producedSchema = fullSchema;
-        } else {
-            String[] fullNames = fullSchema.getFieldNames();
-            DataType[] fullTypes = fullSchema.getFieldDataTypes();
-            producedSchema =
-                    TableSchema.builder()
-                            .fields(
-                                    Arrays.stream(projectedFields)
-                                            .mapToObj(i -> fullNames[i])
-                                            .toArray(String[]::new),
-                                    Arrays.stream(projectedFields)
-                                            .mapToObj(i -> fullTypes[i])
-                                            .toArray(DataType[]::new))
-                            .build();
-        }
-        return (RowType) producedSchema.toRowDataType().bridgedTo(RowData.class).getLogicalType();
-    }
-
-    protected BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() {
-        return LimitableBulkFormat.create(
+    protected BulkFormat<RowData, HiveSourceSplit> createWrappedBulkFormat() {
+        HiveInputFormat inputFormat =
                 new HiveInputFormat(
                         new JobConfWrapper(jobConf),
                         partitionKeys,
                         fullSchema.getFieldNames(),
                         fullSchema.getFieldDataTypes(),
                         hiveVersion,
-                        getProducedRowType(),
-                        fallbackMappedReader),
-                limit);
+                        (RowType) producedType.bridgedTo(RowData.class).getLogicalType(),
+                        fallbackMappedReader);
+
+        FileInfoExtractorBulkFormat<HiveSourceSplit> bulkFormatWithPartitions =
+                new FileInfoExtractorBulkFormat<>(
+                        inputFormat,
+                        producedType,
+                        producedTypeInfo,
+                        new HashMap<>(),

Review Comment:
   nit:Collections.emptyMap()



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileInfoExtractorBulkFormat.java:
##########
@@ -23,95 +23,53 @@
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.utils.PartitionPathUtils;
 
 import java.io.IOException;
-import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * This {@link BulkFormat} is a wrapper that attaches file information columns to the output
  * records.
  */
-class FileInfoExtractorBulkFormat implements BulkFormat<RowData, FileSourceSplit> {
+public class FileInfoExtractorBulkFormat<SplitT extends FileSourceSplit>
+        implements BulkFormat<RowData, SplitT> {
 
-    private final BulkFormat<RowData, FileSourceSplit> wrapped;
-    private final TypeInformation<RowData> producedType;
-
-    private final List<FileSystemTableSource.FileInfoAccessor> metadataColumnsFunctions;
-    private final List<Map.Entry<String, DataType>> partitionColumnTypes;
-    private final int[] extendedRowIndexMapping;
-
-    private final String defaultPartName;
+    private final BulkFormat<RowData, SplitT> wrapped;
+    private final PartitionFieldExtractor<SplitT> partitionFieldExtractor;
+    private final FileInfoExtractor fileInfoExtractor;
+    public final List<FileSystemTableSource.FileInfoAccessor> metadataColumnsFunctions;
+    public final TypeInformation<RowData> producedType;
 
     public FileInfoExtractorBulkFormat(
-            BulkFormat<RowData, FileSourceSplit> wrapped,
+            BulkFormat<RowData, SplitT> wrapped,

Review Comment:
   Why change this?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java:
##########
@@ -119,6 +130,11 @@ public HiveTableInputFormat(
                 projectedFields != null ? projectedFields : IntStream.range(0, rowArity).toArray();
         this.useMapRedReader = useMapRedReader;
         this.partitions = checkNotNull(partitions, "partitions can not be null.");
+        this.fileInfoExtractor =
+                new FileInfoExtractor(producedType, new ArrayList<>(), partitionKeys);

Review Comment:
   nit:
   `Collections.emptyList()`.



##########
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java:
##########
@@ -208,18 +207,9 @@ void testReadFileWithPartitionFields(@TempDir java.nio.file.Path tmpDir) throws
                         /* 1 */ DataTypes.INT().getLogicalType(), // part-1
                         /* 2 */ DataTypes.STRING().getLogicalType(),
                         /* 3 */ DataTypes.BIGINT().getLogicalType(), // part-2
-                        /* 4 */ DataTypes.STRING().getLogicalType(),

Review Comment:
   Why remove them?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java:
##########
@@ -164,6 +186,41 @@ public void open(HiveTableInputSplit split) throws IOException {
         currentReadCount = 0L;
     }
 
+    private EnrichedRowData createEnrichedRow(HiveTableInputSplit inputSplit) {
+        // Fill the partition columns row
+        List<FileInfoExtractor.PartitionColumn> partitionColumns =
+                fileInfoExtractor.getPartitionColumns();
+        final GenericRowData fileInfoRowData = new GenericRowData(partitionColumns.size());
+        int fileInfoRowIndex = 0;
+        if (!partitionColumns.isEmpty()) {
+            final Map<String, String> partitionSpec =
+                    inputSplit.getHiveTablePartition().getPartitionSpec();
+            for (int partitionFieldIndex = 0;
+                    fileInfoRowIndex < fileInfoRowData.getArity();
+                    fileInfoRowIndex++, partitionFieldIndex++) {
+                FileInfoExtractor.PartitionColumn partition =
+                        partitionColumns.get(partitionFieldIndex);
+                if (!partitionSpec.containsKey(partition.fieldName)) {
+                    throw new RuntimeException(
+                            "Cannot find the partition value from path for partition: "
+                                    + partition.fieldName);
+                }
+
+                Object partitionValue =
+                        partitionFieldExtractor.extract(
+                                inputSplit,
+                                partition.fieldName,
+                                partition.dataType.getLogicalType());
+
+                fileInfoRowData.setField(

Review Comment:
   Why not use `PartitionPathUtils#convertStringToInternalValue`? Then we don't need to call method `partition.converter.toInternal`.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileInfoExtractor.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** A helper class to build the fixed and mutable row index mapping. */
+public class FileInfoExtractor implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<PartitionColumn> partitionColumns;
+    private final int[] extendedRowIndexMapping;
+
+    public FileInfoExtractor(
+            DataType producedDataType,
+            List<String> metadataColumns,
+            List<String> partitionColumns) {
+
+        // Compute index mapping for the extended row and the functions to compute metadata
+        List<DataTypes.Field> producedRowField = DataType.getFields(producedDataType);
+        List<String> producedRowFieldNames =
+                producedRowField.stream()
+                        .map(DataTypes.Field::getName)
+                        .collect(Collectors.toList());
+
+        // Filter out partition columns not in producedDataType
+        final List<String> partitionKeysToExtract =
+                DataType.getFieldNames(producedDataType).stream()
+                        .filter(partitionColumns::contains)
+                        .collect(Collectors.toList());
+
+        List<String> mutableRowFieldNames =
+                producedRowFieldNames.stream()
+                        .filter(
+                                key ->
+                                        !metadataColumns.contains(key)
+                                                && !partitionKeysToExtract.contains(key))
+                        .collect(Collectors.toList());
+
+        List<String> fixedRowFieldNames =
+                Stream.concat(metadataColumns.stream(), partitionKeysToExtract.stream())
+                        .collect(Collectors.toList());
+        this.partitionColumns =
+                partitionKeysToExtract.stream()
+                        .map(
+                                name ->
+                                        new PartitionColumn(
+                                                name,
+                                                producedRowField
+                                                        .get(producedRowFieldNames.indexOf(name))
+                                                        .getDataType()))
+                        .collect(Collectors.toList());
+
+        this.extendedRowIndexMapping =
+                EnrichedRowData.computeIndexMapping(
+                        producedRowFieldNames, mutableRowFieldNames, fixedRowFieldNames);
+    }
+
+    public List<PartitionColumn> getPartitionColumns() {
+        return partitionColumns;
+    }
+
+    public int[] getExtendedRowIndexMapping() {
+        return extendedRowIndexMapping;
+    }
+
+    /** Info of the partition column. */
+    public static class PartitionColumn implements Serializable {

Review Comment:
   Do we really need this class? 



##########
flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java:
##########
@@ -58,14 +56,10 @@ OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
                     Configuration hadoopConfig,
                     RowType tableType,
                     List<String> partitionKeys,

Review Comment:
   After `partitionKeys` code is pruned, the name & comment of this method should change.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileInfoExtractor.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** A helper class to build the fixed and mutable row index mapping. */
+public class FileInfoExtractor implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<PartitionColumn> partitionColumns;
+    private final int[] extendedRowIndexMapping;
+
+    public FileInfoExtractor(
+            DataType producedDataType,
+            List<String> metadataColumns,
+            List<String> partitionColumns) {
+
+        // Compute index mapping for the extended row and the functions to compute metadata
+        List<DataTypes.Field> producedRowField = DataType.getFields(producedDataType);
+        List<String> producedRowFieldNames =
+                producedRowField.stream()
+                        .map(DataTypes.Field::getName)
+                        .collect(Collectors.toList());
+
+        // Filter out partition columns not in producedDataType
+        final List<String> partitionKeysToExtract =
+                DataType.getFieldNames(producedDataType).stream()
+                        .filter(partitionColumns::contains)
+                        .collect(Collectors.toList());
+
+        List<String> mutableRowFieldNames =
+                producedRowFieldNames.stream()
+                        .filter(
+                                key ->
+                                        !metadataColumns.contains(key)
+                                                && !partitionKeysToExtract.contains(key))
+                        .collect(Collectors.toList());
+
+        List<String> fixedRowFieldNames =
+                Stream.concat(metadataColumns.stream(), partitionKeysToExtract.stream())
+                        .collect(Collectors.toList());
+        this.partitionColumns =
+                partitionKeysToExtract.stream()
+                        .map(
+                                name ->
+                                        new PartitionColumn(
+                                                name,
+                                                producedRowField
+                                                        .get(producedRowFieldNames.indexOf(name))
+                                                        .getDataType()))
+                        .collect(Collectors.toList());
+
+        this.extendedRowIndexMapping =
+                EnrichedRowData.computeIndexMapping(
+                        producedRowFieldNames, mutableRowFieldNames, fixedRowFieldNames);
+    }
+
+    public List<PartitionColumn> getPartitionColumns() {

Review Comment:
   Can we just return the name of partition column? So that we won't need `PartitionColumn`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org