You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Huang Xingbo (Jira)" <ji...@apache.org> on 2022/08/11 07:38:00 UTC

[jira] [Created] (FLINK-28920) Release Testing: Running Python DataStream Window Job

Huang Xingbo created FLINK-28920:
------------------------------------

             Summary: Release Testing: Running Python DataStream Window Job
                 Key: FLINK-28920
                 URL: https://issues.apache.org/jira/browse/FLINK-28920
             Project: Flink
          Issue Type: Sub-task
          Components: API / Python
    Affects Versions: 1.16.0
            Reporter: Huang Xingbo
             Fix For: 1.16.0


* Build flink source code and compile source code
{code:bash}
$ cd {flink-source-code}
$ mvn clean install -DskipTests
{code}

* Prepare a Python Virtual Environment

{code:bash}
$ cd flink-python/dev
$ ./lint-python.sh -s basic
$ source .conda/bin/activate
{code}

* Install PyFlink from source code. For more details, you can refer to the [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
{code:bash}
$ cd flink-python/apache-flink-libraries
$ python setup.py sdist
$ pip install dist/*.tar.gz
$ cd ..
$ pip install -r dev/dev-requirements.txt
$ python setpy.py sdist
$ pip install dist/*.tar.gz
{code}

h1. Test
* Write a python datastream window job  in thread mode. For details of Window, you can refer to the [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/].

{code:python}
from typing import Iterable, Tuple, Dict

from pyflink.common import Configuration
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import AggregateFunction
from pyflink.datastream.window import EventTimeSessionWindows
def main():
    config = Configuration()
    # thread mode
    config.set_string("python.execution-mode", "thread")
    # process mode
    # config.set_string("python.execution-mode", "process")
    env = StreamExecutionEnvironment.get_execution_environment(config)

    data_stream = env.from_collection([
        ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)],
        type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: DataStream
    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
        .with_timestamp_assigner(SecondColumnTimestampAssigner())

    class MyAggregateFunction(AggregateFunction):

        def create_accumulator(self) -> Tuple[int, str]:
            return 0, ''

        def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) -> Tuple[int, str]:
            return value[1] + accumulator[0], value[0]

        def get_result(self, accumulator: Tuple[str, int]):
            return accumulator[1], accumulator[0]

        def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
            return acc_a[0] + acc_b[0], acc_a[1]

    ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
        .key_by(lambda x: x[0], key_type=Types.STRING()) \
        .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
        .aggregate(MyAggregateFunction(),
                   accumulator_type=Types.TUPLE([Types.INT(), Types.STRING()]),
                   output_type=Types.TUPLE([Types.STRING(), Types.INT()]))

    ds.print()
    env.execute('test_window_aggregate_accumulator_type')


if __name__ == '__main__':
    main()
{code}

* run the python datastream window job and watch the result
{code:bash}
$ python demo.py
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)