You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Shankar Mane <sh...@games24x7.com> on 2021/06/04 14:28:19 UTC

[Question] Python Batch Pipeline On Flink - No Output

Team,

I am new to Beam. And running a simple python pipeline on flink as runner.
Pipelines get Successfully Finished But i am not getting expected output at
the end.

#------------------------------------------------------------------------------------------------------------

Here is Pipeline code :

#!/usr/bin/python3
>
> import argparse
> import logging
> import re
> import typing
>
> import apache_beam as beam
> from apache_beam.io import ReadFromText
> from apache_beam.io import WriteToText
> from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
>
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> from decimal import Decimal
>
>
> class AmoountExtractingDoFn(beam.DoFn):
>     def process(self, element):
>         # Returns an iterator over the words of this element.
>         try:
>             strArr = str(element).replace("\"", "").split(",")
>             print(strArr)
>             return [(strArr[1], float(strArr[2]))]
>         except Exception as e :
>             pass
>
> # Format the counts into a PCollection of strings.
> def format_result(userid, amount):
>     try:
>         return '%s = %d' % (userid, amount)
>     except Exception as e:
>         pass
>
> def run(argv=None, save_main_session=True):
>   """Main entry point; defines and runs the wordcount pipeline."""
>   parser = argparse.ArgumentParser()
>   parser.add_argument(
>       '--input',
>       dest='input',
>       default='gs://dataflow-samples/shakespeare/kinglear.txt',
>       help='Input file to process.')
>   parser.add_argument(
>       '--output',
>       dest='output',
>       required=True,
>       help='Output file to write results to.')
>   known_args, pipeline_args = parser.parse_known_args(argv)
>
>   # We use the save_main_session option because one or more DoFn's in this
>   # workflow rely on global context (e.g., a module imported at module level).
>   pipeline_options = PipelineOptions(pipeline_args)
>   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>
>   # The pipeline will be run on exiting the with block.
>   with beam.Pipeline(options=pipeline_options) as p:
>     # Read the text file[pattern] into a PCollection.
>     lines = p | 'Read' >> ReadFromText(known_args.input)
>     counts = (
>         lines
>         | 'ExtractAmountPerUser' >> (beam.ParDo(AmoountExtractingDoFn()))
>         | 'GroupAndSum' >> beam.CombinePerKey(sum))
>
>     output = counts | 'Format' >> beam.MapTuple(format_result)
>
>     output | beam.Map(lambda x: (b'', x.encode('utf-8'))).with_output_types(typing.Tuple[bytes, bytes])\
>     | 'Write to Kafka' >> WriteToKafka(producer_config={'bootstrap.servers': '10.xxx.xxx.xxx:9092'}, topic='test2')
>
>

#-----------------------------------------------------------
-------------------------------------------------

Here is CMDs to run above pipeline :

CMD-1 : This pipeline is working fine on flink, finished successfully and
output also gets generated at the end.
CMD-2: This pipeline is working fine on flink, finished successfully and NO
OUTPUT at the end.



> # CMD-1 :
> *WORKING - output gets generate*python3 batch.py \
> --input beam-userbase.csv \
> --output output/batch \
> --runner=FlinkRunner \
> --flink_submit_uber_jar \
> --flink_master=localhost:8090 \
> --environment_type=LOOPBACK
>
>

> *# CMD-2:   **WORKING - NO OUTPUT*

python3 batch.py \
> --input beam-userbase.csv \
> --output output/batch \
> --runner=FlinkRunner \
> --flink_submit_uber_jar \
> --flink_master=localhost:8090 \
> --environment_type=DOCKER \
> --environment_config="apache/beam_python3.7_sdk:2.29.0"

Re: [Question] Python Batch Pipeline On Flink - No Output

Posted by Shankar Mane <sh...@games24x7.com>.
Please help..

On Fri, 4 Jun 2021, 19:58 Shankar Mane, <sh...@games24x7.com> wrote:

> Team,
>
> I am new to Beam. And running a simple python pipeline on flink as runner.
> Pipelines get Successfully Finished But i am not getting expected output at
> the end.
>
> #------------------------------------------------------------------------------------------------------------
>
> Here is Pipeline code :
>
> #!/usr/bin/python3
>>
>> import argparse
>> import logging
>> import re
>> import typing
>>
>> import apache_beam as beam
>> from apache_beam.io import ReadFromText
>> from apache_beam.io import WriteToText
>> from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
>>
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.options.pipeline_options import SetupOptions
>> from decimal import Decimal
>>
>>
>> class AmoountExtractingDoFn(beam.DoFn):
>>     def process(self, element):
>>         # Returns an iterator over the words of this element.
>>         try:
>>             strArr = str(element).replace("\"", "").split(",")
>>             print(strArr)
>>             return [(strArr[1], float(strArr[2]))]
>>         except Exception as e :
>>             pass
>>
>> # Format the counts into a PCollection of strings.
>> def format_result(userid, amount):
>>     try:
>>         return '%s = %d' % (userid, amount)
>>     except Exception as e:
>>         pass
>>
>> def run(argv=None, save_main_session=True):
>>   """Main entry point; defines and runs the wordcount pipeline."""
>>   parser = argparse.ArgumentParser()
>>   parser.add_argument(
>>       '--input',
>>       dest='input',
>>       default='gs://dataflow-samples/shakespeare/kinglear.txt',
>>       help='Input file to process.')
>>   parser.add_argument(
>>       '--output',
>>       dest='output',
>>       required=True,
>>       help='Output file to write results to.')
>>   known_args, pipeline_args = parser.parse_known_args(argv)
>>
>>   # We use the save_main_session option because one or more DoFn's in this
>>   # workflow rely on global context (e.g., a module imported at module level).
>>   pipeline_options = PipelineOptions(pipeline_args)
>>   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>>
>>   # The pipeline will be run on exiting the with block.
>>   with beam.Pipeline(options=pipeline_options) as p:
>>     # Read the text file[pattern] into a PCollection.
>>     lines = p | 'Read' >> ReadFromText(known_args.input)
>>     counts = (
>>         lines
>>         | 'ExtractAmountPerUser' >> (beam.ParDo(AmoountExtractingDoFn()))
>>         | 'GroupAndSum' >> beam.CombinePerKey(sum))
>>
>>     output = counts | 'Format' >> beam.MapTuple(format_result)
>>
>>     output | beam.Map(lambda x: (b'', x.encode('utf-8'))).with_output_types(typing.Tuple[bytes, bytes])\
>>     | 'Write to Kafka' >> WriteToKafka(producer_config={'bootstrap.servers': '10.xxx.xxx.xxx:9092'}, topic='test2')
>>
>>
>
> #-----------------------------------------------------------
> -------------------------------------------------
>
> Here is CMDs to run above pipeline :
>
> CMD-1 : This pipeline is working fine on flink, finished successfully and
> output also gets generated at the end.
> CMD-2: This pipeline is working fine on flink, finished successfully and
> NO OUTPUT at the end.
>
>
>
>> # CMD-1 :
>> *WORKING - output gets generate*python3 batch.py \
>> --input beam-userbase.csv \
>> --output output/batch \
>> --runner=FlinkRunner \
>> --flink_submit_uber_jar \
>> --flink_master=localhost:8090 \
>> --environment_type=LOOPBACK
>>
>>
>
>> *# CMD-2:   **WORKING - NO OUTPUT*
>
> python3 batch.py \
>> --input beam-userbase.csv \
>> --output output/batch \
>> --runner=FlinkRunner \
>> --flink_submit_uber_jar \
>> --flink_master=localhost:8090 \
>> --environment_type=DOCKER \
>> --environment_config="apache/beam_python3.7_sdk:2.29.0"
>
>
>

Re: [Question] Python Batch Pipeline On Flink - No Output

Posted by Shankar Mane <sh...@games24x7.com>.
Please help..

On Fri, 4 Jun 2021, 19:58 Shankar Mane, <sh...@games24x7.com> wrote:

> Team,
>
> I am new to Beam. And running a simple python pipeline on flink as runner.
> Pipelines get Successfully Finished But i am not getting expected output at
> the end.
>
> #------------------------------------------------------------------------------------------------------------
>
> Here is Pipeline code :
>
> #!/usr/bin/python3
>>
>> import argparse
>> import logging
>> import re
>> import typing
>>
>> import apache_beam as beam
>> from apache_beam.io import ReadFromText
>> from apache_beam.io import WriteToText
>> from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
>>
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.options.pipeline_options import SetupOptions
>> from decimal import Decimal
>>
>>
>> class AmoountExtractingDoFn(beam.DoFn):
>>     def process(self, element):
>>         # Returns an iterator over the words of this element.
>>         try:
>>             strArr = str(element).replace("\"", "").split(",")
>>             print(strArr)
>>             return [(strArr[1], float(strArr[2]))]
>>         except Exception as e :
>>             pass
>>
>> # Format the counts into a PCollection of strings.
>> def format_result(userid, amount):
>>     try:
>>         return '%s = %d' % (userid, amount)
>>     except Exception as e:
>>         pass
>>
>> def run(argv=None, save_main_session=True):
>>   """Main entry point; defines and runs the wordcount pipeline."""
>>   parser = argparse.ArgumentParser()
>>   parser.add_argument(
>>       '--input',
>>       dest='input',
>>       default='gs://dataflow-samples/shakespeare/kinglear.txt',
>>       help='Input file to process.')
>>   parser.add_argument(
>>       '--output',
>>       dest='output',
>>       required=True,
>>       help='Output file to write results to.')
>>   known_args, pipeline_args = parser.parse_known_args(argv)
>>
>>   # We use the save_main_session option because one or more DoFn's in this
>>   # workflow rely on global context (e.g., a module imported at module level).
>>   pipeline_options = PipelineOptions(pipeline_args)
>>   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>>
>>   # The pipeline will be run on exiting the with block.
>>   with beam.Pipeline(options=pipeline_options) as p:
>>     # Read the text file[pattern] into a PCollection.
>>     lines = p | 'Read' >> ReadFromText(known_args.input)
>>     counts = (
>>         lines
>>         | 'ExtractAmountPerUser' >> (beam.ParDo(AmoountExtractingDoFn()))
>>         | 'GroupAndSum' >> beam.CombinePerKey(sum))
>>
>>     output = counts | 'Format' >> beam.MapTuple(format_result)
>>
>>     output | beam.Map(lambda x: (b'', x.encode('utf-8'))).with_output_types(typing.Tuple[bytes, bytes])\
>>     | 'Write to Kafka' >> WriteToKafka(producer_config={'bootstrap.servers': '10.xxx.xxx.xxx:9092'}, topic='test2')
>>
>>
>
> #-----------------------------------------------------------
> -------------------------------------------------
>
> Here is CMDs to run above pipeline :
>
> CMD-1 : This pipeline is working fine on flink, finished successfully and
> output also gets generated at the end.
> CMD-2: This pipeline is working fine on flink, finished successfully and
> NO OUTPUT at the end.
>
>
>
>> # CMD-1 :
>> *WORKING - output gets generate*python3 batch.py \
>> --input beam-userbase.csv \
>> --output output/batch \
>> --runner=FlinkRunner \
>> --flink_submit_uber_jar \
>> --flink_master=localhost:8090 \
>> --environment_type=LOOPBACK
>>
>>
>
>> *# CMD-2:   **WORKING - NO OUTPUT*
>
> python3 batch.py \
>> --input beam-userbase.csv \
>> --output output/batch \
>> --runner=FlinkRunner \
>> --flink_submit_uber_jar \
>> --flink_master=localhost:8090 \
>> --environment_type=DOCKER \
>> --environment_config="apache/beam_python3.7_sdk:2.29.0"
>
>
>