You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/05 08:44:56 UTC

[GitHub] [iceberg] wuwenchi commented on a diff in pull request #4625: Flink: support watermark and computed columns

wuwenchi commented on code in PR #4625:
URL: https://github.com/apache/iceberg/pull/4625#discussion_r865682252


##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java:
##########
@@ -172,4 +217,110 @@ public static TableSchema toSchema(Schema schema) {
 
     return builder.build();
   }
+
+
+   /**
+   * Convert a {@link Schema} to a {@link Schema}.
+   *
+   * @param schema iceberg schema to convert.
+   * @return Flink Schema.
+   */
+  public static org.apache.flink.table.api.Schema toSchema(Schema schema, Map<String, String> properties) {
+
+    org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder();
+
+    // get watermark and computed columns
+    Map<String, String> watermarkMap = Maps.newHashMap();
+    Map<String, String> computedColumnsMap = Maps.newHashMap();
+    properties.keySet().stream()
+        .filter(k -> k.startsWith(FLINK_PREFIX) && properties.get(k) != null)
+        .forEach(k -> {
+          final String name = k.substring(k.lastIndexOf('.') + 1);
+          String expr = properties.get(k);
+          if (k.startsWith(WATERMARK_PREFIX)) {
+            watermarkMap.put(name, expr);
+          } else if (k.startsWith(COMPUTED_COLUMNS_PREFIX)) {
+            computedColumnsMap.put(name, expr);
+          }
+        });
+
+    // add physical columns.
+    for (RowType.RowField field : convert(schema).getFields()) {
+      builder.column(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
+    }
+
+    // add computed columns.
+    computedColumnsMap.forEach(builder::columnByExpression);
+
+    // add watermarks.
+    watermarkMap.forEach(builder::watermark);
+
+    // add primary key.
+    List<String> primaryKey = getPrimaryKeyFromSchema(schema);
+    if (!primaryKey.isEmpty()) {
+      builder.primaryKey(primaryKey.toArray(new String[0]));
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Convert a {@link CatalogTable} to a {@link ResolvedSchema}.
+   *
+   * @param table flink unresolved schema to convert.
+   * @return Flink ResolvedSchema.
+   */
+  public static ResolvedSchema convertToResolvedSchema(CatalogTable table) {
+    StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
+    StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
+    CatalogManager catalogManager = ((TableEnvironmentImpl) streamTableEnvironment).getCatalogManager();
+    SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
+    return table.getUnresolvedSchema().resolve(schemaResolver);

Review Comment:
   This method is indeed better!
   But the description is taken from the table's comment:
   ```
       @Override
       public Optional<String> getDescription() {
           return Optional.of(getComment());
       }
   ```
   
   The comment does not necessarily exist, and if it exists, it is not necessarily the path of the table, so `tableEnvironment.from` may not be able to get the table...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org