You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "JingsongLi (via GitHub)" <gi...@apache.org> on 2023/03/21 04:34:54 UTC

[GitHub] [incubator-paimon] JingsongLi opened a new pull request, #673: [flink] Introduce watermark alignment options

JingsongLi opened a new pull request, #673:
URL: https://github.com/apache/incubator-paimon/pull/673

   ### Purpose
   
   - Introduce watermark alignment options
   - Fix watermark generation
   
   ### Tests
   
   WatermarkITCase
   
   ### API and Format 
   
   Introduce options.
   
   ### Documentation
   
   Documented.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] SteNicholas commented on a diff in pull request #673: [flink] Introduce watermark alignment options

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #673:
URL: https://github.com/apache/incubator-paimon/pull/673#discussion_r1143025344


##########
docs/content/how-to/querying-tables.md:
##########
@@ -87,9 +87,97 @@ Users can also adjust `changelog-producer` table property to specify the pattern
 
 {{< img src="/img/scan-mode.png">}}
 
-{{< hint info >}}
+## Streaming Read
+
+Streaming Read is only support in Flink SQL currently.

Review Comment:
   ```suggestion
   Streaming source behavior is only supported in Flink engine at present.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi merged pull request #673: [flink] Introduce watermark alignment options

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #673:
URL: https://github.com/apache/incubator-paimon/pull/673


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on pull request #673: [flink] Introduce watermark alignment options

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on PR #673:
URL: https://github.com/apache/incubator-paimon/pull/673#issuecomment-1477365413

   Consistent with https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark-related+features+for+SQL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #673: [flink] Introduce watermark alignment options

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #673:
URL: https://github.com/apache/incubator-paimon/pull/673#discussion_r1143042419


##########
docs/content/how-to/querying-tables.md:
##########
@@ -87,9 +87,97 @@ Users can also adjust `changelog-producer` table property to specify the pattern
 
 {{< img src="/img/scan-mode.png">}}
 
-{{< hint info >}}
+## Streaming Read
+
+Streaming Read is only support in Flink SQL currently.
+
+### Watermark Definition

Review Comment:
   I keep this `###`, Streaming Source can be another section for Scan Mode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] SteNicholas commented on a diff in pull request #673: [flink] Introduce watermark alignment options

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #673:
URL: https://github.com/apache/incubator-paimon/pull/673#discussion_r1143024854


##########
docs/content/how-to/querying-tables.md:
##########
@@ -87,9 +87,97 @@ Users can also adjust `changelog-producer` table property to specify the pattern
 
 {{< img src="/img/scan-mode.png">}}
 
-{{< hint info >}}
+## Streaming Read

Review Comment:
   ```suggestion
   ### Streaming Source
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] SteNicholas commented on a diff in pull request #673: [flink] Introduce watermark alignment options

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #673:
URL: https://github.com/apache/incubator-paimon/pull/673#discussion_r1142970980


##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** SQL ITCase for watermark definition. */
+public class WatermarkITCase extends CatalogITCaseBase {
+
+    @Override
+    protected int defaultParallelism() {
+        return 1;
+    }
+
+    @Test
+    public void testWatermark() throws Exception {
+        innerTestWatermark();
+    }
+
+    @Test
+    public void testWatermarkAlignment() throws Exception {
+        innerTestWatermark(
+                "'scan.watermark.idle-timeout'='1s'",
+                "'scan.watermark-alignment.group'='group'",
+                "'scan.watermark-alignment.max-drift'='1s',");
+    }
+
+    private void innerTestWatermark(String... options) throws Exception {
+        sql(
+                "CREATE TABLE T (f0 INT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts) WITH ("
+                        + "'write-mode'='append-only',"
+                        + String.join(",", options)
+                        + " 'scan.watermark.emit.strategy'='on-event')");

Review Comment:
   Adds test case for `on-periodic` strategy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] SteNicholas commented on a diff in pull request #673: [flink] Introduce watermark alignment options

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #673:
URL: https://github.com/apache/incubator-paimon/pull/673#discussion_r1143026067


##########
docs/content/how-to/querying-tables.md:
##########
@@ -87,9 +87,97 @@ Users can also adjust `changelog-producer` table property to specify the pattern
 
 {{< img src="/img/scan-mode.png">}}
 
-{{< hint info >}}
+## Streaming Read
+
+Streaming Read is only support in Flink SQL currently.
+
+### Watermark Definition

Review Comment:
   ```suggestion
   #### Watermark Definition
   ```



##########
docs/content/how-to/querying-tables.md:
##########
@@ -87,9 +87,97 @@ Users can also adjust `changelog-producer` table property to specify the pattern
 
 {{< img src="/img/scan-mode.png">}}
 
-{{< hint info >}}
+## Streaming Read
+
+Streaming Read is only support in Flink SQL currently.
+
+### Watermark Definition
+
+You can define watermark for reading Paimon tables:
+
+```sql
+CREATE TABLE T (
+    `user` BIGINT,
+    product STRING,
+    order_time TIMESTAMP(3),
+    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
+);
+
+-- launch a bounded streaming job to read paimon_table
+SELECT window_start, window_end, SUM(f0) FROM
+ TUMBLE(TABLE T, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; */;
+```
+
+You can also enable [Flink Watermark alignment](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_),
+which will make sure no sources/splits/shards/partitions increase their watermarks too far ahead of the rest. 
+
+Watermark related options:
+
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>scan.watermark.alignment.group</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>A group of sources to align watermarks.</td>
+        </tr>
+        <tr>
+            <td><h5>scan.watermark.alignment.max-drift</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Maximal drift to align watermarks, before we pause consuming from the source/task/partition.</td>
+        </tr>
+        <tr>
+            <td><h5>scan.watermark.alignment.update-interval</h5></td>
+            <td style="word-wrap: break-word;">1 s</td>
+            <td>Duration</td>
+            <td>How often tasks should notify coordinator about the current watermark and how often the coordinator should announce the maximal aligned watermark.</td>
+        </tr>
+        <tr>
+            <td><h5>scan.watermark.emit.strategy</h5></td>
+            <td style="word-wrap: break-word;">on-event</td>
+            <td><p>Enum</p></td>
+            <td>Emit strategy for watermark generation.<br /><br />Possible values:<ul><li>"on-periodic": Emit watermark periodically, interval is controlled by Flink 'pipeline.auto-watermark-interval'.</li><li>"on-event": Emit watermark per record.</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>scan.watermark.idle-timeout</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>If no records flow in a partition of a stream for that amount of time, then that partition is considered "idle" and will not hold back the progress of watermarks in downstream operators.</td>
+        </tr>
+    </tbody>
+</table>
+
+### Bounded Stream

Review Comment:
   ```suggestion
   #### Bounded Stream
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] SteNicholas commented on a diff in pull request #673: [flink] Introduce watermark alignment options

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #673:
URL: https://github.com/apache/incubator-paimon/pull/673#discussion_r1143031686


##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/WatermarkITCase.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** SQL ITCase for watermark definition. */
+public class WatermarkITCase extends CatalogITCaseBase {
+
+    @Override
+    protected int defaultParallelism() {
+        return 1;
+    }
+
+    @Test
+    public void testWatermark() throws Exception {
+        innerTestWatermark();
+    }
+
+    @Test
+    public void testWatermarkAlignment() throws Exception {
+        innerTestWatermark(
+                "'scan.watermark.idle-timeout'='1s'",
+                "'scan.watermark.emit.strategy'='on-event'",
+                "'scan.watermark-alignment.group'='group'",
+                "'scan.watermark-alignment.max-drift'='1s',");

Review Comment:
   No test for the config option `scan.watermark.alignment.update-interval`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org