You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Huang Xingbo (Jira)" <ji...@apache.org> on 2022/08/11 07:18:00 UTC
[jira] [Created] (FLINK-28918) Release Testing: Running Python DataStream jobs in Thread Mode
Huang Xingbo created FLINK-28918:
------------------------------------
Summary: Release Testing: Running Python DataStream jobs in Thread Mode
Key: FLINK-28918
URL: https://issues.apache.org/jira/browse/FLINK-28918
Project: Flink
Issue Type: Sub-task
Components: API / Python
Affects Versions: 1.16.0
Reporter: Huang Xingbo
Fix For: 1.16.0
* Build flink source code and compile source code
{code:bash}
$ cd {flink-source-code}
$ mvn clean install -DskipTests
{code}
* Prepare a Python Virtual Environment
{code:bash}
$ cd flink-python/dev
$ ./lint-python.sh -s basic
$ source .conda/bin/activate
{code}
* Install PyFlink from source code. For more details, you can refer to the [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
{code:bash}
$ cd flink-python/apache-flink-libraries
$ python setup.py sdist
$ pip install dist/*.tar.gz
$ cd ..
$ pip install -r dev/dev-requirements.txt
$ python setpy.py sdist
$ pip install dist/*.tar.gz
{code}
h1. Test
* Write a python datastream job in thread mode
{code:python}
from pyflink.common import Configuration
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
def main():
config = Configuration()
config.set_integer("python.execution-mode", "thread")
env = StreamExecutionEnvironment.get_execution_environment(config)
ds = env.from_collection(
[(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)],
type_info=Types.ROW_NAMED(["v1", "v2", "v3"],
[Types.INT(), Types.STRING(), Types.INT()]))
def flat_map_func1(data):
for i in data:
yield int(i), 1
def flat_map_func2(data):
for i in data:
yield i
ds = ds.key_by(lambda x: x[0]) \
.min_by("v2") \
.map(lambda x: (x[0], x[1], x[2]),
output_type=Types.TUPLE([Types.INT(), Types.STRING(), Types.INT()])) \
.key_by(lambda x: x[2]) \
.max_by(0) \
.flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), Types.INT()])) \
.key_by(lambda x: [1]) \
.min_by() \
.flat_map(flat_map_func2, output_type=Types.INT()) \
.key_by(lambda x: x) \
.max_by()
ds.print()
self.env.execute("key_by_min_by_max_by_test_batch")
if __name__ == '__main__':
main()
{code}
* run the python datastream job and watch the result
{code:bash}
$ python demo.py
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)