You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/06 12:04:04 UTC

[GitHub] [flink] twalthr commented on a change in pull request #17350: [FLINK-24359][table-runtime] Use ResolvedSchema in AbstractFileSystemTable

twalthr commented on a change in pull request #17350:
URL: https://github.com/apache/flink/pull/17350#discussion_r723166638



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
##########
@@ -39,7 +39,8 @@
     final DynamicTableFactory.Context context;
     final ObjectIdentifier tableIdentifier;
     final Configuration tableOptions;
-    final TableSchema schema;
+    final ResolvedSchema schema;

Review comment:
       both schema and dataType can now be easily obtained from the `context` member, we don't need additional members and keep the code short and simple

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/DeserializationSchemaAdapter.java
##########
@@ -73,31 +72,32 @@
 
     public DeserializationSchemaAdapter(
             DeserializationSchema<RowData> deserializationSchema,
-            TableSchema schema,
+            DataType physicalDataType,
             int[] projectFields,
             List<String> partitionKeys,
             String defaultPartValue) {
         this.deserializationSchema = deserializationSchema;
-        this.fieldNames = schema.getFieldNames();
-        this.fieldTypes = schema.getFieldDataTypes();
+        final RowType rowType = ((RowType) physicalDataType.getLogicalType());

Review comment:
       nit: use the `DataType.getFieldNames` method and the streams API? also for the types?

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -310,20 +316,18 @@ public String asSummaryString() {
 
     private int[] readFields() {
         return projectedFields == null
-                ? IntStream.range(0, schema.getFieldCount()).toArray()
+                ? IntStream.range(0, DataType.getFieldCount(getPhysicalDataType())).toArray()
                 : Arrays.stream(projectedFields).mapToInt(array -> array[0]).toArray();
     }
 
-    private DataType getProducedDataType() {
-        int[] fields = readFields();
-        String[] schemaFieldNames = schema.getFieldNames();
-        DataType[] schemaTypes = schema.getFieldDataTypes();
-
-        return DataTypes.ROW(
-                        Arrays.stream(fields)
-                                .mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i]))
-                                .toArray(DataTypes.Field[]::new))
-                .bridgedTo(RowData.class)
-                .notNull();
+    private DataType getProjectedDataType() {
+        DataType physicalDataType = super.getPhysicalDataType();

Review comment:
       nit: could be `final`

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -158,8 +159,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
     private RowDataPartitionComputer partitionComputer() {
         return new RowDataPartitionComputer(
                 defaultPartName,
-                schema.getFieldNames(),
-                schema.getFieldDataTypes(),
+                DataType.getFieldNames(getPhysicalDataType()).toArray(new String[] {}),

Review comment:
       nit: in Flink we usually use `toArray(new String[0])` instead of `toArray(new String[] {})`
   see https://stackoverflow.com/questions/9572795/convert-list-to-array-in-java

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -331,7 +338,7 @@ private RowDataPartitionComputer partitionComputer() {
         return new FileSystemFormatFactory.ReaderContext() {
             @Override
             public TableSchema getSchema() {

Review comment:
       can we update this to `DataType`? it would be good to get rid of `TableSchemaUtils`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org