You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by po...@gmx.com on 2022/06/15 14:10:51 UTC

How to run Beam pipeline in Flink [Python]?


Hello there!  
  
I'm learning Beam and Flink. I made Flink is running on my PC, I installed
Beam module (pip install apache-beam).  
My first step is to execute simple script:  
  
def run():  
 import apache_beam as beam  
 from apache_beam.options.pipeline_options import PipelineOptions  
  
 options = PipelineOptions([  
        "\--runner=FlinkRunner",  
        "\--flink_version=1.14",  
        "\--flink_master=localhost:8081",  
        "\--environment_config=localhost:50000"  
 ])  
 output_file = 'E:\\directory\\output.txt'  
 with beam.Pipeline(options=options) as p:  
    (p  
        | 'Create file lines' >> beam.Create([  
          'Each element must be a string.',  
          'It writes one element per line.',  
          'There are no guarantees on the line order.',  
          'The data might be written into multiple files.',  
        ])  
        | 'Write to files' >> beam.io.WriteToText(output_file)  
    )  
if __name__ == "__main__":  
    run()  
  
I see job in Flink dashboard but it stops after several seconds:

CHAIN MapPartition (MapPartition at [2]Write to
files/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:3320>), Map(decode)})
-> FlatMap (FlatMap at ExtractOutput[0]) FAILED  
  
Same problem if I want to open some file.  
What is wrong here? I tried several example scripts - none is working.  
If you could help me to take first step in Beam and Flink.  
Regards  
Mike