You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/12/07 06:09:22 UTC

[GitHub] [iceberg] hameizi opened a new pull request #3681: Add watermark in flink table

hameizi opened a new pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681


   If we add watermark in table we can deal data by flink with eventtime, and then we can count stream window with watermark and join with 'version table' what flink define.
   In this PR the information of watermark is stored in table properties so that it will not affect original scheam of iceberg.
   After this PR we can create table set watermark like below sql:
    CREATE TABLE catalog.database.table(
       id BIGINT COMMENT 'unique id',
       data STRING, 
       testTime TIMESTAMP(3),
       PRIMARY KEY(id) NOT ENFORCED,
       WATERMARK FOR testTime AS testTime - INTERVAL '5' SECOND
   ) 
   with ('format-version' = '2'
   );


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778266930



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -377,6 +378,23 @@ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
     PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
 
     ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+
+    List<WatermarkSpec> watermarkSpecs = table.getSchema().getWatermarkSpecs();
+    if (watermarkSpecs != null) {
+      for (int i = 0; i < watermarkSpecs.size(); i++) {

Review comment:
       I am not familiar with that part of Flink. can you help me understand the list of watermark spec? is it to define multiple watermark strategies and writer can pick one strategy to use?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1064809386


   > > I have another concern about this change: should we bind the watermark strategy within the table metadata? I prefer binding it with the job instead of the table.
   > 
   > +1 on @yittg 's suggestion. Different jobs may want to set the watermark differently.
   
   But there is one question that where are we save watermark properties when we bind the  watermark strategy with job? Maybe it need extra storage?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778271866



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -149,14 +156,43 @@ public static TableSchema toSchema(RowType rowType) {
    * @param schema iceberg schema to convert.
    * @return Flink TableSchema.
    */
-  public static TableSchema toSchema(Schema schema) {

Review comment:
       for backward compatibility, we probably should keep the old method signature and add a new method signature. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778975310



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -377,6 +378,23 @@ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
     PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
 
     ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+
+    List<WatermarkSpec> watermarkSpecs = table.getSchema().getWatermarkSpecs();
+    if (watermarkSpecs != null) {
+      for (int i = 0; i < watermarkSpecs.size(); i++) {

Review comment:
       then we need to add the validation that there is only one watermark spec defined




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
yittg commented on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1064904034


   @kingeasternsun, yeah it indeed works. honestly, we are using it like this way currently. Maybe nothing need be changed, but gotta to say it is weird, :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
hameizi commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778572856



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -377,6 +378,23 @@ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
     PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
 
     ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+
+    List<WatermarkSpec> watermarkSpecs = table.getSchema().getWatermarkSpecs();
+    if (watermarkSpecs != null) {
+      for (int i = 0; i < watermarkSpecs.size(); i++) {

Review comment:
       In flink just the first watermark strategy will be apply.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778280984



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -172,4 +208,16 @@ public static TableSchema toSchema(Schema schema) {
 
     return builder.build();
   }
+
+  private static Optional<String> optionalGet(String key, Map<String, String> properties) {
+    return Optional.ofNullable(properties.get(key));
+  }
+
+  private static Supplier<TableException> exceptionSupplier(String key) {
+    return () -> {
+      throw new TableException(
+          "Property with key '" + key + "' could not be found. " +
+              "This is a bug because the validation logic should have checked that before.");

Review comment:
       nit: this part seems redundant




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1057788083


   > I have another concern about this change: should we bind the watermark strategy within the table metadata? I prefer binding it with the job instead of the table.
   
   @yittg 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778269293



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -277,7 +279,6 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno
         if (!removals.isEmpty()) {
           asNamespaceCatalog.removeProperties(namespace, removals);
         }
-

Review comment:
       nit: avoid unnecessary whitespace change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1064472790


   > I have another concern about this change: should we bind the watermark strategy within the table metadata? I prefer binding it with the job instead of the table.
   
   +1 on @yittg 's suggestion


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun edited a comment on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
kingeasternsun edited a comment on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1064887971


   Hello everyOne, I think May be we can just use iceberg connector to solve this , no need to modify code. @hameizi  @yittg 
   
   first of all,  use flink to create a iceberg table without  `Computed columns` or watermark, like this:
   ```java
       tenv.executeSql("CREATE CATALOG iceberg_catalog WITH (\n" +
           "  'type'='iceberg',\n" +
           "  'catalog-type'='hive'," +
           "  'uri'='thrift://host:9083'," +
           "  'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
           ")");
   
       tenv.useCatalog("iceberg_catalog");
       tenv.useDatabase("iceberg_db");
   
       String tableName = "iceberg_origin";
   
       tenv.executeSql("create table if not exists  " + tableName + "(\n" +
           " order_id    STRING,\n" +
           " price       DECIMAL(32,2),\n" +
           " currency    STRING,\n" +
           " origin_order_time TIMESTAMP(6)\n" +
           ") WITH (\n" +
           " 'format.version' = '2'\n" +
           ")"
       );
   ```
   
   then use iceberg connector to create a Table source with `Computed columns`  and watermark support,like this
   ```java
       tenv.executeSql("create table if not exists  default_catalog.default_database.iceberg_table (\n" +
           " order_id    STRING,\n" +
           " price       DECIMAL(32,2),\n" +
           " currency    STRING,\n" +
           " origin_order_time TIMESTAMP(6),\n" +
           " order_time as cast(origin_order_time as TIMESTAMP(3)),\n" +
           " WATERMARK FOR order_time AS order_time - INTERVAL '5' MINUTE\n" +
           ") WITH (\n" +
           " 'connector' = 'iceberg',\n" +
           " 'catalog-name' = 'iceberg_catalog',\n" +
           " 'catalog-database' = 'iceberg_db',\n" +
           " 'catalog-table' = 'iceberg_origin',\n" +
           "  'uri'='thrift://host:9083'," +
           "  'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
           ")"
       );
   
   ```
   
   if different watermark strategy is needed, we just need create another table source . for example, 10 MINUTES watermark 
   ```java
       tenv.executeSql("create table if not exists  default_catalog.default_database.iceberg_table (\n" +
           " order_id    STRING,\n" +
           " price       DECIMAL(32,2),\n" +
           " currency    STRING,\n" +
           " origin_order_time TIMESTAMP(6),\n" +
           " order_time as cast(origin_order_time as TIMESTAMP(3)),\n" +
           " WATERMARK FOR order_time AS order_time - INTERVAL '10' MINUTE\n" +
           ") WITH (\n" +
           " 'connector' = 'iceberg',\n" +
           " 'catalog-name' = 'iceberg_catalog',\n" +
           " 'catalog-database' = 'iceberg_db',\n" +
           " 'catalog-table' = 'iceberg_origin',\n" +
           "  'uri'='thrift://host:9083'," +
           "  'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
           ")"
       );
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
yittg commented on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1054072475


   Looks like this PR can fix #2264


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r817408949



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -149,14 +158,44 @@ public static TableSchema toSchema(RowType rowType) {
    * @param schema iceberg schema to convert.
    * @return Flink TableSchema.
    */
-  public static TableSchema toSchema(Schema schema) {
+  public static TableSchema toSchema(Schema schema, Map<String, String> properties) {
     TableSchema.Builder builder = TableSchema.builder();
 
     // Add columns.
     for (RowType.RowField field : convert(schema).getFields()) {
       builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
     }
 
+    // Add watermark
+    final int watermarkCount =

Review comment:
       @kingeasternsun yeah, i think almost.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778272735



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java
##########
@@ -108,10 +108,10 @@ public LogicalType primitive(Type.PrimitiveType primitive) {
         Types.TimestampType timestamp = (Types.TimestampType) primitive;
         if (timestamp.shouldAdjustToUTC()) {
           // MICROS
-          return new LocalZonedTimestampType(6);
+          return new LocalZonedTimestampType(3);

Review comment:
       why this change? same question for line 114.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
hameizi commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778572856



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -377,6 +378,23 @@ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
     PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
 
     ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+
+    List<WatermarkSpec> watermarkSpecs = table.getSchema().getWatermarkSpecs();
+    if (watermarkSpecs != null) {
+      for (int i = 0; i < watermarkSpecs.size(); i++) {

Review comment:
       Flink just support one watermarkspec be defined else will throw `Multiple watermark definition is not supported yet.`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
hameizi commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778572472



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -377,6 +378,23 @@ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
     PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
 
     ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+
+    List<WatermarkSpec> watermarkSpecs = table.getSchema().getWatermarkSpecs();

Review comment:
       Yes, it should be update. But there is many other work relate to this, so maybe should other PR to update this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778268907



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -377,6 +378,23 @@ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
     PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
 
     ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+
+    List<WatermarkSpec> watermarkSpecs = table.getSchema().getWatermarkSpecs();
+    if (watermarkSpecs != null) {
+      for (int i = 0; i < watermarkSpecs.size(); i++) {
+        WatermarkSpec watermarkSpec = watermarkSpecs.get(i);
+        properties.put(
+            DescriptorProperties.WATERMARK + '.' + i + '.' + DescriptorProperties.WATERMARK_ROWTIME,

Review comment:
       since these watermark spec properties are Flink specific, we probably should add a `flink.` prefix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778280766



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -172,4 +208,16 @@ public static TableSchema toSchema(Schema schema) {
 
     return builder.build();
   }
+
+  private static Optional<String> optionalGet(String key, Map<String, String> properties) {
+    return Optional.ofNullable(properties.get(key));
+  }
+
+  private static Supplier<TableException> exceptionSupplier(String key) {
+    return () -> {
+      throw new TableException(
+          "Property with key '" + key + "' could not be found. " +

Review comment:
       nit: `Property` -> `Required property`
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
hameizi commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778574501



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -149,14 +156,43 @@ public static TableSchema toSchema(RowType rowType) {
    * @param schema iceberg schema to convert.
    * @return Flink TableSchema.
    */
-  public static TableSchema toSchema(Schema schema) {
+  public static TableSchema toSchema(Schema schema, Map<String, String> properties) {
     TableSchema.Builder builder = TableSchema.builder();
 
     // Add columns.
     for (RowType.RowField field : convert(schema).getFields()) {
       builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
     }
 
+    // Add watermark
+    final int watermarkCount =
+        properties.keySet().stream()
+            .filter(
+                (k) ->

Review comment:
       This logic is similar to flink's. So i think it is more suitable, and we can update it with flink version in future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778991249



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -52,6 +59,8 @@
  */
 public class FlinkSchemaUtil {
 
+  public static final String flinkPrefix = "flink.";

Review comment:
       static variable name should be all upper cases.
   
   also I am not sure this is the right class to host this constant. I am wondering if we should introduce a `FlinkTableProperties` to host this one. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-987609838


   @openinx @stevenzwu can you help take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
kingeasternsun commented on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1064887971


   Hello everyOne, I think May be we can just use iceberg connector to solve this , no need to modify code.
   
   first of all,  use flink to create a iceberg table without  `Computed columns` or watermark, like this:
   ```java
       tenv.executeSql("CREATE CATALOG iceberg_catalog WITH (\n" +
           "  'type'='iceberg',\n" +
           "  'catalog-type'='hive'," +
           "  'uri'='thrift://host:9083'," +
           "  'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
           ")");
   
       tenv.useCatalog("iceberg_catalog");
       tenv.useDatabase("iceberg_db");
   
       String tableName = "iceberg_origin";
   
       tenv.executeSql("create table if not exists  " + tableName + "(\n" +
           " order_id    STRING,\n" +
           " price       DECIMAL(32,2),\n" +
           " currency    STRING,\n" +
           " origin_order_time TIMESTAMP(6)\n" +
           ") WITH (\n" +
           " 'format.version' = '2'\n" +
           ")"
       );
   ```
   
   then use iceberg connector to add `Computed columns`  and watermark support,like this
   ```java
       tenv.executeSql("create table if not exists  default_catalog.default_database.iceberg_table (\n" +
           " order_id    STRING,\n" +
           " price       DECIMAL(32,2),\n" +
           " currency    STRING,\n" +
           " origin_order_time TIMESTAMP(6),\n" +
           " order_time as cast(origin_order_time as TIMESTAMP(3)),\n" +
           " WATERMARK FOR order_time AS order_time - INTERVAL '5' MINUTE\n" +
           ") WITH (\n" +
           " 'connector' = 'iceberg',\n" +
           " 'catalog-name' = 'iceberg_catalog',\n" +
           " 'catalog-database' = 'iceberg_db',\n" +
           " 'catalog-table' = 'iceberg_origin',\n" +
           "  'uri'='thrift://host:9083'," +
           "  'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
           ")"
       );
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
yittg commented on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1065034492


   I think saving the watermark information if provided in FlinkCatalog may be an option. How do you think , @stevenzwu @hameizi ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-991950250


   @stevenzwu, can you take a look at this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1005495564


   @stevenzwu push one new commit fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
hameizi commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778573370



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java
##########
@@ -108,10 +108,10 @@ public LogicalType primitive(Type.PrimitiveType primitive) {
         Types.TimestampType timestamp = (Types.TimestampType) primitive;
         if (timestamp.shouldAdjustToUTC()) {
           // MICROS
-          return new LocalZonedTimestampType(6);
+          return new LocalZonedTimestampType(3);

Review comment:
       Because watermark of flink just support timestamptype(0-3), so i change this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
hameizi commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778573036



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -377,6 +378,23 @@ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
     PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
 
     ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+
+    List<WatermarkSpec> watermarkSpecs = table.getSchema().getWatermarkSpecs();
+    if (watermarkSpecs != null) {
+      for (int i = 0; i < watermarkSpecs.size(); i++) {
+        WatermarkSpec watermarkSpec = watermarkSpecs.get(i);
+        properties.put(
+            DescriptorProperties.WATERMARK + '.' + i + '.' + DescriptorProperties.WATERMARK_ROWTIME,

Review comment:
       Ok, i will add `flink.` prefix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778264841



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -377,6 +378,23 @@ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
     PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
 
     ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+
+    List<WatermarkSpec> watermarkSpecs = table.getSchema().getWatermarkSpecs();

Review comment:
       It seems that `TableSchema` is deprecated. Should we switch to `getUnresolvedSchema` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778281534



##########
File path: flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
##########
@@ -306,7 +307,7 @@ public void testConvertFlinkSchemaWithPrimaryKeys() {
         Sets.newHashSet(1, 2)
     );
 
-    TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema);
+    TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema, Maps.newHashMap());

Review comment:
       maybe add unit test coverage with watermark spec properties




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
yittg commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r815695622



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java
##########
@@ -108,10 +108,10 @@ public LogicalType primitive(Type.PrimitiveType primitive) {
         Types.TimestampType timestamp = (Types.TimestampType) primitive;
         if (timestamp.shouldAdjustToUTC()) {
           // MICROS
-          return new LocalZonedTimestampType(6);
+          return new LocalZonedTimestampType(3);

Review comment:
       Yes, we should not change it here. It can be checked outside.

##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -149,14 +158,44 @@ public static TableSchema toSchema(RowType rowType) {
    * @param schema iceberg schema to convert.
    * @return Flink TableSchema.
    */
-  public static TableSchema toSchema(Schema schema) {
+  public static TableSchema toSchema(Schema schema, Map<String, String> properties) {
     TableSchema.Builder builder = TableSchema.builder();
 
     // Add columns.
     for (RowType.RowField field : convert(schema).getFields()) {
       builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
     }
 
+    // Add watermark
+    final int watermarkCount =

Review comment:
       We can check if a column is used as watermark column, and add its field type as TIMESTAMP(3)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun edited a comment on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
kingeasternsun edited a comment on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1064887971


   Hello everyOne, I think May be we can just use iceberg connector to solve this , no need to modify code. @hameizi  @yittg 
   
   first of all,  use flink to create a iceberg table without  `Computed columns` or watermark, like this:
   ```java
       tenv.executeSql("CREATE CATALOG iceberg_catalog WITH (\n" +
           "  'type'='iceberg',\n" +
           "  'catalog-type'='hive'," +
           "  'uri'='thrift://host:9083'," +
           "  'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
           ")");
   
       tenv.useCatalog("iceberg_catalog");
       tenv.useDatabase("iceberg_db");
   
       String tableName = "iceberg_origin";
   
       tenv.executeSql("create table if not exists  " + tableName + "(\n" +
           " order_id    STRING,\n" +
           " price       DECIMAL(32,2),\n" +
           " currency    STRING,\n" +
           " origin_order_time TIMESTAMP(6)\n" +
           ") WITH (\n" +
           " 'format.version' = '2'\n" +
           ")"
       );
   ```
   
   then use iceberg connector to create a Table source with `Computed columns`  and watermark support,like this  @stevenzwu 
   ```java
       tenv.executeSql("create table if not exists  default_catalog.default_database.iceberg_table (\n" +
           " order_id    STRING,\n" +
           " price       DECIMAL(32,2),\n" +
           " currency    STRING,\n" +
           " origin_order_time TIMESTAMP(6),\n" +
           " order_time as cast(origin_order_time as TIMESTAMP(3)),\n" +
           " WATERMARK FOR order_time AS order_time - INTERVAL '5' MINUTE\n" +
           ") WITH (\n" +
           " 'connector' = 'iceberg',\n" +
           " 'catalog-name' = 'iceberg_catalog',\n" +
           " 'catalog-database' = 'iceberg_db',\n" +
           " 'catalog-table' = 'iceberg_origin',\n" +
           "  'uri'='thrift://host:9083'," +
           "  'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
           ")"
       );
   
   ```
   
   if different watermark strategy is needed, we just need create another table source . for example, 10 MINUTES watermark 
   ```java
       tenv.executeSql("create table if not exists  default_catalog.default_database.iceberg_table (\n" +
           " order_id    STRING,\n" +
           " price       DECIMAL(32,2),\n" +
           " currency    STRING,\n" +
           " origin_order_time TIMESTAMP(6),\n" +
           " order_time as cast(origin_order_time as TIMESTAMP(3)),\n" +
           " WATERMARK FOR order_time AS order_time - INTERVAL '10' MINUTE\n" +
           ") WITH (\n" +
           " 'connector' = 'iceberg',\n" +
           " 'catalog-name' = 'iceberg_catalog',\n" +
           " 'catalog-database' = 'iceberg_db',\n" +
           " 'catalog-table' = 'iceberg_origin',\n" +
           "  'uri'='thrift://host:9083'," +
           "  'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
           ")"
       );
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r824568301



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -474,10 +501,6 @@ private static void validateFlinkTable(CatalogBaseTable table) {
         throw new UnsupportedOperationException("Creating table with computed columns is not supported yet.");
       }
     });
-
-    if (!schema.getWatermarkSpecs().isEmpty()) {
-      throw new UnsupportedOperationException("Creating table with watermark specs is not supported yet.");

Review comment:
       As I mentioned [here](https://github.com/apache/iceberg/issues/2264#issuecomment-877808285) before, maybe we just need to remove this validation, like @kingeasternsun 's [comment](https://github.com/apache/iceberg/pull/3681#issuecomment-1064887971) said.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778985127



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java
##########
@@ -108,10 +108,10 @@ public LogicalType primitive(Type.PrimitiveType primitive) {
         Types.TimestampType timestamp = (Types.TimestampType) primitive;
         if (timestamp.shouldAdjustToUTC()) {
           // MICROS
-          return new LocalZonedTimestampType(6);
+          return new LocalZonedTimestampType(3);

Review comment:
       Iceberg timestamp is micro-second only. I am afraid that we can't do this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778992289



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -149,14 +156,43 @@ public static TableSchema toSchema(RowType rowType) {
    * @param schema iceberg schema to convert.
    * @return Flink TableSchema.
    */
-  public static TableSchema toSchema(Schema schema) {
+  public static TableSchema toSchema(Schema schema, Map<String, String> properties) {
     TableSchema.Builder builder = TableSchema.builder();
 
     // Add columns.
     for (RowType.RowField field : convert(schema).getFields()) {
       builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
     }
 
+    // Add watermark
+    final int watermarkCount =
+        properties.keySet().stream()
+            .filter(
+                (k) ->

Review comment:
       ok. sounds good to me




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi closed pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
hameizi closed pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
kingeasternsun commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r817389286



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -149,14 +158,44 @@ public static TableSchema toSchema(RowType rowType) {
    * @param schema iceberg schema to convert.
    * @return Flink TableSchema.
    */
-  public static TableSchema toSchema(Schema schema) {
+  public static TableSchema toSchema(Schema schema, Map<String, String> properties) {
     TableSchema.Builder builder = TableSchema.builder();
 
     // Add columns.
     for (RowType.RowField field : convert(schema).getFields()) {
       builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
     }
 
+    // Add watermark
+    final int watermarkCount =

Review comment:
       > We can check if a column is used as watermark column, and add its field type as TIMESTAMP(3)
   
   Maybe like this?  @yittg 
   ```java
     public static TableSchema toSchema(Schema schema, Map<String, String> properties) {
       TableSchema.Builder builder = TableSchema.builder();
       Map<String, DataType> watermarkRow = new HashMap<>();
   
       // Add watermark
       final int watermarkCount =
           properties.keySet().stream()
               .filter(
                   (k) ->
                       k.startsWith(FLINK_PREFIX + DescriptorProperties.WATERMARK) &&
                           k.endsWith('.' + DescriptorProperties.WATERMARK_ROWTIME))
               .mapToInt((k) -> 1)
               .sum();
       if (watermarkCount > 0) {
         for (int i = 0; i < watermarkCount; i++) {
           final String rowtimeKey =
               FLINK_PREFIX + DescriptorProperties.WATERMARK + '.' + i + '.' + DescriptorProperties.WATERMARK_ROWTIME;
           final String exprKey =
               FLINK_PREFIX + DescriptorProperties.WATERMARK + '.' + i + '.' +
                   DescriptorProperties.WATERMARK_STRATEGY_EXPR;
           final String typeKey =
               FLINK_PREFIX + DescriptorProperties.WATERMARK + '.' + i + '.' +
                   DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
           final String rowtime =
               optionalGet(rowtimeKey, properties).orElseThrow(exceptionSupplier(rowtimeKey));
           final String exprString =
               optionalGet(exprKey, properties).orElseThrow(exceptionSupplier(exprKey));
           final String typeString =
               optionalGet(typeKey, properties).orElseThrow(exceptionSupplier(typeKey));
           final DataType exprType =
               TypeConversions.fromLogicalToDataType(LogicalTypeParser.parse(typeString));
           builder.watermark(rowtime, exprString, exprType);
           watermarkRow.put(rowtime, exprType);
         }
       }
   
       // Add columns.
       for (RowType.RowField field : convert(schema).getFields()) {
         builder.field(
             field.getName(),
             watermarkRow.getOrDefault(field.getName(), TypeConversions.fromLogicalToDataType(field.getType())));
       }
   
       // Add primary key.
       Set<Integer> identifierFieldIds = schema.identifierFieldIds();
       if (!identifierFieldIds.isEmpty()) {
         List<String> columns = Lists.newArrayListWithExpectedSize(identifierFieldIds.size());
         for (Integer identifierFieldId : identifierFieldIds) {
           String columnName = schema.findColumnName(identifierFieldId);
           Preconditions.checkNotNull(columnName, "Cannot find field with id %s in schema %s", identifierFieldId, schema);
   
           columns.add(columnName);
         }
         builder.primaryKey(columns.toArray(new String[0]));
       }
   
       return builder.build();
     }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi removed a comment on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
hameizi removed a comment on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1057788083


   > I have another concern about this change: should we bind the watermark strategy within the table metadata? I prefer binding it with the job instead of the table.
   
   @yittg 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yittg commented on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
yittg commented on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1057732451


   I have another concern about this change: should we bind the watermark strategy within the table metadata? I prefer  binding it with the job instead of the table.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
kingeasternsun commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r817389286



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -149,14 +158,44 @@ public static TableSchema toSchema(RowType rowType) {
    * @param schema iceberg schema to convert.
    * @return Flink TableSchema.
    */
-  public static TableSchema toSchema(Schema schema) {
+  public static TableSchema toSchema(Schema schema, Map<String, String> properties) {
     TableSchema.Builder builder = TableSchema.builder();
 
     // Add columns.
     for (RowType.RowField field : convert(schema).getFields()) {
       builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
     }
 
+    // Add watermark
+    final int watermarkCount =

Review comment:
       > We can check if a column is used as watermark column, and add its field type as TIMESTAMP(3)
   
   Maybe like this?  @yittg 
   ```java
     public static TableSchema toSchema(Schema schema, Map<String, String> properties) {
       TableSchema.Builder builder = TableSchema.builder();
       HashMap<String, DataType> watermarkRow = new HashMap<String, DataType>();
   
       // Add watermark
       final int watermarkCount =
           properties.keySet().stream()
               .filter(
                   (k) ->
                       k.startsWith(FLINK_PREFIX + DescriptorProperties.WATERMARK) &&
                           k.endsWith('.' + DescriptorProperties.WATERMARK_ROWTIME))
               .mapToInt((k) -> 1)
               .sum();
       if (watermarkCount > 0) {
         for (int i = 0; i < watermarkCount; i++) {
           final String rowtimeKey =
               FLINK_PREFIX + DescriptorProperties.WATERMARK + '.' + i + '.' + DescriptorProperties.WATERMARK_ROWTIME;
           final String exprKey =
               FLINK_PREFIX + DescriptorProperties.WATERMARK + '.' + i + '.' +
                   DescriptorProperties.WATERMARK_STRATEGY_EXPR;
           final String typeKey =
               FLINK_PREFIX + DescriptorProperties.WATERMARK + '.' + i + '.' +
                   DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
           final String rowtime =
               optionalGet(rowtimeKey, properties).orElseThrow(exceptionSupplier(rowtimeKey));
           final String exprString =
               optionalGet(exprKey, properties).orElseThrow(exceptionSupplier(exprKey));
           final String typeString =
               optionalGet(typeKey, properties).orElseThrow(exceptionSupplier(typeKey));
           final DataType exprType =
               TypeConversions.fromLogicalToDataType(LogicalTypeParser.parse(typeString));
           builder.watermark(rowtime, exprString, exprType);
           watermarkRow.put(rowtime, exprType);
         }
       }
   
       // Add columns.
       for (RowType.RowField field : convert(schema).getFields()) {
         builder.field(
             field.getName(),
             watermarkRow.getOrDefault(field.getName(), TypeConversions.fromLogicalToDataType(field.getType())));
       }
   
       // Add primary key.
       Set<Integer> identifierFieldIds = schema.identifierFieldIds();
       if (!identifierFieldIds.isEmpty()) {
         List<String> columns = Lists.newArrayListWithExpectedSize(identifierFieldIds.size());
         for (Integer identifierFieldId : identifierFieldIds) {
           String columnName = schema.findColumnName(identifierFieldId);
           Preconditions.checkNotNull(columnName, "Cannot find field with id %s in schema %s", identifierFieldId, schema);
   
           columns.add(columnName);
         }
         builder.primaryKey(columns.toArray(new String[0]));
       }
   
       return builder.build();
     }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1064472790


   > I have another concern about this change: should we bind the watermark strategy within the table metadata? I prefer binding it with the job instead of the table.
   
   +1 on @yittg 's suggestion. Different jobs may want to set the watermark differently.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778991990



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -149,14 +158,44 @@ public static TableSchema toSchema(RowType rowType) {
    * @param schema iceberg schema to convert.
    * @return Flink TableSchema.
    */
-  public static TableSchema toSchema(Schema schema) {
+  public static TableSchema toSchema(Schema schema, Map<String, String> properties) {
     TableSchema.Builder builder = TableSchema.builder();
 
     // Add columns.
     for (RowType.RowField field : convert(schema).getFields()) {
       builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
     }
 
+    // Add watermark
+    final int watermarkCount =
+        properties.keySet().stream()
+            .filter(
+                (k) ->
+                    k.startsWith(flinkPrefix + DescriptorProperties.WATERMARK) &&
+                        k.endsWith('.' + DescriptorProperties.WATERMARK_ROWTIME))
+            .mapToInt((k) -> 1)
+            .sum();
+    if (watermarkCount > 0) {
+      for (int i = 0; i < watermarkCount; i++) {
+        final String rowtimeKey =
+            flinkPrefix + DescriptorProperties.WATERMARK + '.' + i + '.' + DescriptorProperties.WATERMARK_ROWTIME;

Review comment:
       saw you did static important on the constant in the test class. you want to apply it here too for consistency?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #3681: Add watermark in flink table

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#discussion_r778278408



##########
File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -149,14 +156,43 @@ public static TableSchema toSchema(RowType rowType) {
    * @param schema iceberg schema to convert.
    * @return Flink TableSchema.
    */
-  public static TableSchema toSchema(Schema schema) {
+  public static TableSchema toSchema(Schema schema, Map<String, String> properties) {
     TableSchema.Builder builder = TableSchema.builder();
 
     // Add columns.
     for (RowType.RowField field : convert(schema).getFields()) {
       builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
     }
 
+    // Add watermark
+    final int watermarkCount =
+        properties.keySet().stream()
+            .filter(
+                (k) ->

Review comment:
       should we use Java regex matcher for more accurate match?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org