You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Shankar Mane <sh...@games24x7.com> on 2021/06/04 14:28:19 UTC
[Question] Python Batch Pipeline On Flink - No Output
Team,
I am new to Beam. And running a simple python pipeline on flink as runner.
Pipelines get Successfully Finished But i am not getting expected output at
the end.
#------------------------------------------------------------------------------------------------------------
Here is Pipeline code :
#!/usr/bin/python3
>
> import argparse
> import logging
> import re
> import typing
>
> import apache_beam as beam
> from apache_beam.io import ReadFromText
> from apache_beam.io import WriteToText
> from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
>
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> from decimal import Decimal
>
>
> class AmoountExtractingDoFn(beam.DoFn):
> def process(self, element):
> # Returns an iterator over the words of this element.
> try:
> strArr = str(element).replace("\"", "").split(",")
> print(strArr)
> return [(strArr[1], float(strArr[2]))]
> except Exception as e :
> pass
>
> # Format the counts into a PCollection of strings.
> def format_result(userid, amount):
> try:
> return '%s = %d' % (userid, amount)
> except Exception as e:
> pass
>
> def run(argv=None, save_main_session=True):
> """Main entry point; defines and runs the wordcount pipeline."""
> parser = argparse.ArgumentParser()
> parser.add_argument(
> '--input',
> dest='input',
> default='gs://dataflow-samples/shakespeare/kinglear.txt',
> help='Input file to process.')
> parser.add_argument(
> '--output',
> dest='output',
> required=True,
> help='Output file to write results to.')
> known_args, pipeline_args = parser.parse_known_args(argv)
>
> # We use the save_main_session option because one or more DoFn's in this
> # workflow rely on global context (e.g., a module imported at module level).
> pipeline_options = PipelineOptions(pipeline_args)
> pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>
> # The pipeline will be run on exiting the with block.
> with beam.Pipeline(options=pipeline_options) as p:
> # Read the text file[pattern] into a PCollection.
> lines = p | 'Read' >> ReadFromText(known_args.input)
> counts = (
> lines
> | 'ExtractAmountPerUser' >> (beam.ParDo(AmoountExtractingDoFn()))
> | 'GroupAndSum' >> beam.CombinePerKey(sum))
>
> output = counts | 'Format' >> beam.MapTuple(format_result)
>
> output | beam.Map(lambda x: (b'', x.encode('utf-8'))).with_output_types(typing.Tuple[bytes, bytes])\
> | 'Write to Kafka' >> WriteToKafka(producer_config={'bootstrap.servers': '10.xxx.xxx.xxx:9092'}, topic='test2')
>
>
#-----------------------------------------------------------
-------------------------------------------------
Here is CMDs to run above pipeline :
CMD-1 : This pipeline is working fine on flink, finished successfully and
output also gets generated at the end.
CMD-2: This pipeline is working fine on flink, finished successfully and NO
OUTPUT at the end.
> # CMD-1 :
> *WORKING - output gets generate*python3 batch.py \
> --input beam-userbase.csv \
> --output output/batch \
> --runner=FlinkRunner \
> --flink_submit_uber_jar \
> --flink_master=localhost:8090 \
> --environment_type=LOOPBACK
>
>
> *# CMD-2: **WORKING - NO OUTPUT*
python3 batch.py \
> --input beam-userbase.csv \
> --output output/batch \
> --runner=FlinkRunner \
> --flink_submit_uber_jar \
> --flink_master=localhost:8090 \
> --environment_type=DOCKER \
> --environment_config="apache/beam_python3.7_sdk:2.29.0"
Re: [Question] Python Batch Pipeline On Flink - No Output
Posted by Shankar Mane <sh...@games24x7.com>.
Please help..
On Fri, 4 Jun 2021, 19:58 Shankar Mane, <sh...@games24x7.com> wrote:
> Team,
>
> I am new to Beam. And running a simple python pipeline on flink as runner.
> Pipelines get Successfully Finished But i am not getting expected output at
> the end.
>
> #------------------------------------------------------------------------------------------------------------
>
> Here is Pipeline code :
>
> #!/usr/bin/python3
>>
>> import argparse
>> import logging
>> import re
>> import typing
>>
>> import apache_beam as beam
>> from apache_beam.io import ReadFromText
>> from apache_beam.io import WriteToText
>> from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
>>
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.options.pipeline_options import SetupOptions
>> from decimal import Decimal
>>
>>
>> class AmoountExtractingDoFn(beam.DoFn):
>> def process(self, element):
>> # Returns an iterator over the words of this element.
>> try:
>> strArr = str(element).replace("\"", "").split(",")
>> print(strArr)
>> return [(strArr[1], float(strArr[2]))]
>> except Exception as e :
>> pass
>>
>> # Format the counts into a PCollection of strings.
>> def format_result(userid, amount):
>> try:
>> return '%s = %d' % (userid, amount)
>> except Exception as e:
>> pass
>>
>> def run(argv=None, save_main_session=True):
>> """Main entry point; defines and runs the wordcount pipeline."""
>> parser = argparse.ArgumentParser()
>> parser.add_argument(
>> '--input',
>> dest='input',
>> default='gs://dataflow-samples/shakespeare/kinglear.txt',
>> help='Input file to process.')
>> parser.add_argument(
>> '--output',
>> dest='output',
>> required=True,
>> help='Output file to write results to.')
>> known_args, pipeline_args = parser.parse_known_args(argv)
>>
>> # We use the save_main_session option because one or more DoFn's in this
>> # workflow rely on global context (e.g., a module imported at module level).
>> pipeline_options = PipelineOptions(pipeline_args)
>> pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>>
>> # The pipeline will be run on exiting the with block.
>> with beam.Pipeline(options=pipeline_options) as p:
>> # Read the text file[pattern] into a PCollection.
>> lines = p | 'Read' >> ReadFromText(known_args.input)
>> counts = (
>> lines
>> | 'ExtractAmountPerUser' >> (beam.ParDo(AmoountExtractingDoFn()))
>> | 'GroupAndSum' >> beam.CombinePerKey(sum))
>>
>> output = counts | 'Format' >> beam.MapTuple(format_result)
>>
>> output | beam.Map(lambda x: (b'', x.encode('utf-8'))).with_output_types(typing.Tuple[bytes, bytes])\
>> | 'Write to Kafka' >> WriteToKafka(producer_config={'bootstrap.servers': '10.xxx.xxx.xxx:9092'}, topic='test2')
>>
>>
>
> #-----------------------------------------------------------
> -------------------------------------------------
>
> Here is CMDs to run above pipeline :
>
> CMD-1 : This pipeline is working fine on flink, finished successfully and
> output also gets generated at the end.
> CMD-2: This pipeline is working fine on flink, finished successfully and
> NO OUTPUT at the end.
>
>
>
>> # CMD-1 :
>> *WORKING - output gets generate*python3 batch.py \
>> --input beam-userbase.csv \
>> --output output/batch \
>> --runner=FlinkRunner \
>> --flink_submit_uber_jar \
>> --flink_master=localhost:8090 \
>> --environment_type=LOOPBACK
>>
>>
>
>> *# CMD-2: **WORKING - NO OUTPUT*
>
> python3 batch.py \
>> --input beam-userbase.csv \
>> --output output/batch \
>> --runner=FlinkRunner \
>> --flink_submit_uber_jar \
>> --flink_master=localhost:8090 \
>> --environment_type=DOCKER \
>> --environment_config="apache/beam_python3.7_sdk:2.29.0"
>
>
>
Re: [Question] Python Batch Pipeline On Flink - No Output
Posted by Shankar Mane <sh...@games24x7.com>.
Please help..
On Fri, 4 Jun 2021, 19:58 Shankar Mane, <sh...@games24x7.com> wrote:
> Team,
>
> I am new to Beam. And running a simple python pipeline on flink as runner.
> Pipelines get Successfully Finished But i am not getting expected output at
> the end.
>
> #------------------------------------------------------------------------------------------------------------
>
> Here is Pipeline code :
>
> #!/usr/bin/python3
>>
>> import argparse
>> import logging
>> import re
>> import typing
>>
>> import apache_beam as beam
>> from apache_beam.io import ReadFromText
>> from apache_beam.io import WriteToText
>> from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
>>
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.options.pipeline_options import SetupOptions
>> from decimal import Decimal
>>
>>
>> class AmoountExtractingDoFn(beam.DoFn):
>> def process(self, element):
>> # Returns an iterator over the words of this element.
>> try:
>> strArr = str(element).replace("\"", "").split(",")
>> print(strArr)
>> return [(strArr[1], float(strArr[2]))]
>> except Exception as e :
>> pass
>>
>> # Format the counts into a PCollection of strings.
>> def format_result(userid, amount):
>> try:
>> return '%s = %d' % (userid, amount)
>> except Exception as e:
>> pass
>>
>> def run(argv=None, save_main_session=True):
>> """Main entry point; defines and runs the wordcount pipeline."""
>> parser = argparse.ArgumentParser()
>> parser.add_argument(
>> '--input',
>> dest='input',
>> default='gs://dataflow-samples/shakespeare/kinglear.txt',
>> help='Input file to process.')
>> parser.add_argument(
>> '--output',
>> dest='output',
>> required=True,
>> help='Output file to write results to.')
>> known_args, pipeline_args = parser.parse_known_args(argv)
>>
>> # We use the save_main_session option because one or more DoFn's in this
>> # workflow rely on global context (e.g., a module imported at module level).
>> pipeline_options = PipelineOptions(pipeline_args)
>> pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>>
>> # The pipeline will be run on exiting the with block.
>> with beam.Pipeline(options=pipeline_options) as p:
>> # Read the text file[pattern] into a PCollection.
>> lines = p | 'Read' >> ReadFromText(known_args.input)
>> counts = (
>> lines
>> | 'ExtractAmountPerUser' >> (beam.ParDo(AmoountExtractingDoFn()))
>> | 'GroupAndSum' >> beam.CombinePerKey(sum))
>>
>> output = counts | 'Format' >> beam.MapTuple(format_result)
>>
>> output | beam.Map(lambda x: (b'', x.encode('utf-8'))).with_output_types(typing.Tuple[bytes, bytes])\
>> | 'Write to Kafka' >> WriteToKafka(producer_config={'bootstrap.servers': '10.xxx.xxx.xxx:9092'}, topic='test2')
>>
>>
>
> #-----------------------------------------------------------
> -------------------------------------------------
>
> Here is CMDs to run above pipeline :
>
> CMD-1 : This pipeline is working fine on flink, finished successfully and
> output also gets generated at the end.
> CMD-2: This pipeline is working fine on flink, finished successfully and
> NO OUTPUT at the end.
>
>
>
>> # CMD-1 :
>> *WORKING - output gets generate*python3 batch.py \
>> --input beam-userbase.csv \
>> --output output/batch \
>> --runner=FlinkRunner \
>> --flink_submit_uber_jar \
>> --flink_master=localhost:8090 \
>> --environment_type=LOOPBACK
>>
>>
>
>> *# CMD-2: **WORKING - NO OUTPUT*
>
> python3 batch.py \
>> --input beam-userbase.csv \
>> --output output/batch \
>> --runner=FlinkRunner \
>> --flink_submit_uber_jar \
>> --flink_master=localhost:8090 \
>> --environment_type=DOCKER \
>> --environment_config="apache/beam_python3.7_sdk:2.29.0"
>
>
>