You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/05 10:11:51 UTC
[flink] branch master updated: [FLINK-24778][table-common] Add
DataTypes#ROW(List) to reduce friction when using Stream of fields
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 94328f9 [FLINK-24778][table-common] Add DataTypes#ROW(List<Field>) to reduce friction when using Stream of fields
94328f9 is described below
commit 94328f90c5a3d13acff6dfbf2e22d1c7aa59c080
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu Nov 4 17:36:11 2021 +0100
[FLINK-24778][table-common] Add DataTypes#ROW(List<Field>) to reduce friction when using Stream of fields
Signed-off-by: slinkydeveloper <fr...@gmail.com>
This closes #17688.
---
.../java/org/apache/flink/table/api/DataTypes.java | 5 +++++
.../apache/flink/table/catalog/ResolvedSchema.java | 17 +++++++--------
.../flink/table/types/utils/DataTypeUtils.java | 24 ++--------------------
3 files changed, 14 insertions(+), 32 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
index 2b6b247..93abbd5 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
@@ -744,6 +744,11 @@ public final class DataTypes {
return new FieldsDataType(new RowType(logicalFields), fieldDataTypes);
}
+ /** @see #ROW(Field...) */
+ public static DataType ROW(List<Field> fields) {
+ return ROW(fields.toArray(new Field[0]));
+ }
+
/**
* Data type of a sequence of fields.
*
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
index ff7ffb2..5fab674 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
@@ -41,7 +40,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.table.api.DataTypes.FIELD;
-import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.types.utils.DataTypeUtils.removeTimeAttribute;
/**
@@ -257,14 +255,13 @@ public final class ResolvedSchema {
// --------------------------------------------------------------------------------------------
- private FieldsDataType toRowDataType(Predicate<Column> columnPredicate) {
- final DataTypes.Field[] fields =
- columns.stream()
- .filter(columnPredicate)
- .map(ResolvedSchema::columnToField)
- .toArray(DataTypes.Field[]::new);
- // the row should never be null
- return (FieldsDataType) ROW(fields).notNull();
+ private DataType toRowDataType(Predicate<Column> columnPredicate) {
+ return columns.stream()
+ .filter(columnPredicate)
+ .map(ResolvedSchema::columnToField)
+ .collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW))
+ // the row should never be null
+ .notNull();
}
private static DataTypes.Field columnToField(Column column) {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
index 34f9b6e..4380761 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -162,28 +162,8 @@ public final class DataTypeUtils {
if (fields.size() == 0) {
return dataType;
}
-
- final RowType rowType = (RowType) dataType.getLogicalType();
- final List<RowField> newFields =
- Stream.concat(
- rowType.getFields().stream(),
- fields.stream()
- .map(
- f ->
- new RowField(
- f.getName(),
- f.getDataType().getLogicalType(),
- f.getDescription().orElse(null))))
- .collect(Collectors.toList());
- final RowType newRowType = new RowType(rowType.isNullable(), newFields);
-
- final List<DataType> newFieldDataTypes =
- Stream.concat(
- dataType.getChildren().stream(),
- fields.stream().map(DataTypes.Field::getDataType))
- .collect(Collectors.toList());
-
- return new FieldsDataType(newRowType, dataType.getConversionClass(), newFieldDataTypes);
+ return Stream.concat(DataType.getFields(dataType).stream(), fields.stream())
+ .collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
}
/**