You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/05 10:11:51 UTC

[flink] branch master updated: [FLINK-24778][table-common] Add DataTypes#ROW(List) to reduce friction when using Stream of fields

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 94328f9  [FLINK-24778][table-common] Add DataTypes#ROW(List<Field>) to reduce friction when using Stream of fields
94328f9 is described below

commit 94328f90c5a3d13acff6dfbf2e22d1c7aa59c080
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu Nov 4 17:36:11 2021 +0100

    [FLINK-24778][table-common] Add DataTypes#ROW(List<Field>) to reduce friction when using Stream of fields
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
    
    This closes #17688.
---
 .../java/org/apache/flink/table/api/DataTypes.java |  5 +++++
 .../apache/flink/table/catalog/ResolvedSchema.java | 17 +++++++--------
 .../flink/table/types/utils/DataTypeUtils.java     | 24 ++--------------------
 3 files changed, 14 insertions(+), 32 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
index 2b6b247..93abbd5 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
@@ -744,6 +744,11 @@ public final class DataTypes {
         return new FieldsDataType(new RowType(logicalFields), fieldDataTypes);
     }
 
+    /** @see #ROW(Field...) */
+    public static DataType ROW(List<Field> fields) {
+        return ROW(fields.toArray(new Field[0]));
+    }
+
     /**
      * Data type of a sequence of fields.
      *
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
index ff7ffb2..5fab674 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -41,7 +40,6 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.table.api.DataTypes.FIELD;
-import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.types.utils.DataTypeUtils.removeTimeAttribute;
 
 /**
@@ -257,14 +255,13 @@ public final class ResolvedSchema {
 
     // --------------------------------------------------------------------------------------------
 
-    private FieldsDataType toRowDataType(Predicate<Column> columnPredicate) {
-        final DataTypes.Field[] fields =
-                columns.stream()
-                        .filter(columnPredicate)
-                        .map(ResolvedSchema::columnToField)
-                        .toArray(DataTypes.Field[]::new);
-        // the row should never be null
-        return (FieldsDataType) ROW(fields).notNull();
+    private DataType toRowDataType(Predicate<Column> columnPredicate) {
+        return columns.stream()
+                .filter(columnPredicate)
+                .map(ResolvedSchema::columnToField)
+                .collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW))
+                // the row should never be null
+                .notNull();
     }
 
     private static DataTypes.Field columnToField(Column column) {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
index 34f9b6e..4380761 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -162,28 +162,8 @@ public final class DataTypeUtils {
         if (fields.size() == 0) {
             return dataType;
         }
-
-        final RowType rowType = (RowType) dataType.getLogicalType();
-        final List<RowField> newFields =
-                Stream.concat(
-                                rowType.getFields().stream(),
-                                fields.stream()
-                                        .map(
-                                                f ->
-                                                        new RowField(
-                                                                f.getName(),
-                                                                f.getDataType().getLogicalType(),
-                                                                f.getDescription().orElse(null))))
-                        .collect(Collectors.toList());
-        final RowType newRowType = new RowType(rowType.isNullable(), newFields);
-
-        final List<DataType> newFieldDataTypes =
-                Stream.concat(
-                                dataType.getChildren().stream(),
-                                fields.stream().map(DataTypes.Field::getDataType))
-                        .collect(Collectors.toList());
-
-        return new FieldsDataType(newRowType, dataType.getConversionClass(), newFieldDataTypes);
+        return Stream.concat(DataType.getFields(dataType).stream(), fields.stream())
+                .collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
     }
 
     /**