You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Thilo Schneider (Jira)" <ji...@apache.org> on 2020/11/12 16:45:00 UTC

[jira] [Created] (FLINK-20128) Data loss for over windows with rows unbounded preceding

Thilo Schneider created FLINK-20128:
---------------------------------------

             Summary: Data loss for over windows with rows unbounded preceding
                 Key: FLINK-20128
                 URL: https://issues.apache.org/jira/browse/FLINK-20128
             Project: Flink
          Issue Type: Bug
          Components: API / Python, Table SQL / Planner
    Affects Versions: 1.11.2, 1.12.0
            Reporter: Thilo Schneider


When using partitioned, unbounded over windows, all but one partitions are dropped in the output dataset:
{code:python}
# Setup
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

from biafflink import debug_print_table

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create( environment_settings=env_settings)
t_env.get_config().get_configuration().set_integer("table.exec.resource.default-parallelism", 1)

t_env.execute_sql("""
CREATE TABLE datagen (
 foo INT,
 id AS mod(foo, 2),
 message_time AS to_timestamp(from_unixtime(FLOOR(foo/2))),
 WATERMARK FOR message_time AS message_time
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='2',
 'fields.foo.kind'='sequence',
 'fields.foo.start'='0',
 'fields.foo.end'='19'
)""")
t_env.execute_sql("CREATE TABLE output (foo INT, id INT, lagfoo INT) WITH ('connector' = 'print')")

{code}
Using bounded over windows, everything works as expected:
{code:python}
t = t_env.sql_query("""
    SELECT foo, id, avg(foo) OVER w AS lagfoo 
    FROM datagen 
    WINDOW w AS (PARTITION BY id ORDER BY message_time ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)""")
t.execute_insert("output")
{code}
yields
{code:python}
+I(0,0,0)
+I(1,1,1)
+I(2,0,1)
+I(3,1,2)
+I(4,0,3)
...
 {code}
If we change the window to unbounded preceding:
{code:python}
t = t_env.sql_query("""
    SELECT foo, id, avg(foo) OVER w AS lagfoo 
    FROM datagen 
    WINDOW w AS (PARTITION BY id ORDER BY message_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)""")
t.execute_insert("output")
{code}
 we loose all of id == 1:
{code:python}
+I(0,0,0)
+I(2,0,1)
+I(4,0,2)
+I(6,0,3)
+I(8,0,4)
...
{code}

I observed this problem with various aggregate functions and both under 1.11.2 and 1.12rc1. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)