You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Huang Xingbo (Jira)" <ji...@apache.org> on 2023/02/16 07:23:00 UTC

[jira] [Created] (FLINK-31099) Chained WindowOperator throws NPE in PyFlink ThreadMode

Huang Xingbo created FLINK-31099:
------------------------------------

             Summary: Chained WindowOperator throws NPE in PyFlink ThreadMode
                 Key: FLINK-31099
                 URL: https://issues.apache.org/jira/browse/FLINK-31099
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.16.1, 1.17.0
            Reporter: Huang Xingbo
            Assignee: Huang Xingbo
             Fix For: 1.17.0, 1.16.2


Test case
{code:python}
config = Configuration()
config.set_string("python.execution-mode", "process")
env = StreamExecutionEnvironment.get_execution_environment(config)

class MyTimestampAssigner(TimestampAssigner, ABC):
    def extract_timestamp(self, value: tuple, record_timestamp: int) -> int:
        return value[0]

ds = env.from_collection(
    [(1676461680000, "a1", "b1", 1), (1676461680000, "a1", "b1", 1),
     (1676461680000, "a2", "b2", 1), (1676461680000, "a1", "b2", 1),
     (1676461740000, "a1", "b1", 1), (1676461740000, "a2", "b2", 1)]
).assign_timestamps_and_watermarks(
    WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(MyTimestampAssigner())
)
ds.key_by(
    lambda x: (x[0], x[1], x[2])
).window(
    TumblingEventTimeWindows.of(Time.minutes(1))
).reduce(
    lambda x, y: (x[0], x[1], x[2], x[3] + y[3]),
    output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING(), Types.INT()])
# ).filter(
#     lambda x: x[1] == "a1"
).map(
    lambda x: (x[0], x[1], x[3]),
    output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.INT()])
).print()
env.execute()
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)