You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/24 08:34:04 UTC

[GitHub] [flink] JingsongLi commented on a diff in pull request #21182: [FLINK-29679][table] Migrate to new schema framework & show column comment

JingsongLi commented on code in PR #21182:
URL: https://github.com/apache/flink/pull/21182#discussion_r1031181983


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java:
##########
@@ -346,6 +346,41 @@ public Schema toSchema() {
         return builder.build();
     }
 
+    /** Helps to migrate to the new {@link Schema} class, retain comments when needed. */
+    public Schema toSchema(Map<String, String> comments) {
+        final Schema.Builder builder = Schema.newBuilder();
+
+        columns.forEach(
+                column -> {
+                    if (column instanceof PhysicalColumn) {
+                        final PhysicalColumn c = (PhysicalColumn) column;
+                        builder.column(c.getName(), c.getType());
+                    } else if (column instanceof MetadataColumn) {
+                        final MetadataColumn c = (MetadataColumn) column;
+                        builder.columnByMetadata(
+                                c.getName(),
+                                c.getType(),
+                                c.getMetadataAlias().orElse(null),
+                                c.isVirtual());
+                    } else if (column instanceof ComputedColumn) {
+                        final ComputedColumn c = (ComputedColumn) column;
+                        builder.columnByExpression(c.getName(), c.getExpression());
+                    } else {
+                        throw new IllegalArgumentException("Unsupported column type: " + column);
+                    }
+                    builder.withComment(comments.get(column.getName()));

Review Comment:
   Can it be null?
   We can improve this method a little bit.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1600,23 +1619,38 @@ private Object[][] buildTableColumns(ResolvedSchema schema) {
                                                             "PRI(%s)",
                                                             String.join(", ", columns))));
                         });
-
+        boolean nonComments = isSchemaNonColumnComments(schema);
         return schema.getColumns().stream()
                 .map(
                         (c) -> {
                             final LogicalType logicalType = c.getDataType().getLogicalType();
+                            if (nonComments) {
+                                return new Object[] {

Review Comment:
   Can we just create `List<Object>`, with comments, we can add a comment.
   Then `List.toArray` to a object array.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1600,23 +1619,38 @@ private Object[][] buildTableColumns(ResolvedSchema schema) {
                                                             "PRI(%s)",
                                                             String.join(", ", columns))));
                         });
-
+        boolean nonComments = isSchemaNonColumnComments(schema);
         return schema.getColumns().stream()
                 .map(
                         (c) -> {
                             final LogicalType logicalType = c.getDataType().getLogicalType();
+                            if (nonComments) {
+                                return new Object[] {
+                                    c.getName(),
+                                    logicalType.copy(true).asSummaryString(),
+                                    logicalType.isNullable(),
+                                    fieldToPrimaryKey.getOrDefault(c.getName(), null),
+                                    c.explainExtras().orElse(null),
+                                    fieldToWatermark.getOrDefault(c.getName(), null)
+                                };
+                            }
                             return new Object[] {
                                 c.getName(),
                                 logicalType.copy(true).asSummaryString(),
                                 logicalType.isNullable(),
                                 fieldToPrimaryKey.getOrDefault(c.getName(), null),
                                 c.explainExtras().orElse(null),
-                                fieldToWatermark.getOrDefault(c.getName(), null)
+                                fieldToWatermark.getOrDefault(c.getName(), null),
+                                c.getComment().orElse(null)
                             };
                         })
                 .toArray(Object[][]::new);
     }
 
+    private boolean isSchemaNonColumnComments(ResolvedSchema schema) {
+        return schema.getColumns().stream().noneMatch(col -> col.getComment().isPresent());

Review Comment:
   Minor: `schema.getColumns().stream().map(Column::getComment).noneMatch(Optional::isPresent);`
   This is just a matter of personal code style.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java:
##########
@@ -62,13 +61,10 @@ protected TimeIndicatorTypeInfo(boolean isEventTime) {
         this.isEventTime = isEventTime;
     }
 
-    // this replaces the effective serializer by a LongSerializer
-    // it is a hacky but efficient solution to keep the object creation overhead low but still
-    // be compatible with the corresponding SqlTimestampTypeInfo
     @Override
     @SuppressWarnings("unchecked")
     public TypeSerializer<Timestamp> createSerializer(ExecutionConfig executionConfig) {
-        return (TypeSerializer) LongSerializer.INSTANCE;
+        return (TypeSerializer) LocalDateTimeSerializer.INSTANCE;

Review Comment:
   Can you explain why we need to modify this? Does it have any other impact?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java:
##########
@@ -346,6 +346,41 @@ public Schema toSchema() {
         return builder.build();
     }
 
+    /** Helps to migrate to the new {@link Schema} class, retain comments when needed. */
+    public Schema toSchema(Map<String, String> comments) {

Review Comment:
   I like this method~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org