You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by po...@gmx.com on 2022/06/10 21:08:00 UTC
Python and first code to execute Beam pipeline on Flink
Hello there!
I'm learning Beam and Flink. I made Flink is running on my PC, I installed
Beam module (pip install apache-beam).
My first step is to execute simple script:
_def run():
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions([
"\--runner=FlinkRunner",
"\--flink_version=1.14",
"\--flink_master=localhost:8081",
"\--environment_config=localhost:50000"
])_
_ output_file = 'E:\\directory\\output.txt'_
_ with beam.Pipeline(options=options) as p:
(p
| 'Create file lines' >> beam.Create([
'Each element must be a string.',
'It writes one element per line.',
'There are no guarantees on the line order.',
'The data might be written into multiple files.',
])
| 'Write to files' >> beam.io.WriteToText(output_file)
)_
_if __name__ == "__main__":
run()_
I see job in Flink dashboard but it stops after several seconds:
CHAIN MapPartition (MapPartition at [2]Write to
files/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:3320>), Map(decode)})
-> FlatMap (FlatMap at ExtractOutput[0]) FAILED
Same problem if I want to open some file.
What is wrong here? I tried several example scripts - none is working.
If you could help me to take first step in Beam and Flink.
Regards
Mike