You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by po...@gmx.com on 2022/06/10 21:08:00 UTC

Python and first code to execute Beam pipeline on Flink


Hello there!



I'm learning Beam and Flink. I made Flink is running on my PC, I installed
Beam module (pip install apache-beam).

My first step is to execute simple script:



_def run():  
 import apache_beam as beam  
 from apache_beam.options.pipeline_options import PipelineOptions  
  
 options = PipelineOptions([  
        "\--runner=FlinkRunner",  
        "\--flink_version=1.14",  
        "\--flink_master=localhost:8081",  
        "\--environment_config=localhost:50000"  
 ])_

_  output_file = 'E:\\directory\\output.txt'_

_  with beam.Pipeline(options=options) as p:  
    (p  
        | 'Create file lines' >> beam.Create([  
          'Each element must be a string.',  
          'It writes one element per line.',  
          'There are no guarantees on the line order.',  
          'The data might be written into multiple files.',  
        ])  
        | 'Write to files' >> beam.io.WriteToText(output_file)  
    )_

_if __name__ == "__main__":  
    run()_



I see job in Flink dashboard but it stops after several seconds:

CHAIN MapPartition (MapPartition at [2]Write to
files/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:3320>), Map(decode)})
-> FlatMap (FlatMap at ExtractOutput[0]) FAILED



Same problem if I want to open some file.

What is wrong here? I tried several example scripts - none is working.

If you could help me to take first step in Beam and Flink.

Regards

Mike