You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by radhika sharma <ra...@gmail.com> on 2021/02/03 13:17:41 UTC

How to create an emty file using apche beam

I have created a data flow template as below

from __future__ import absolute_import
import apache_beam as beam
import argparse
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery
from datetime import date
today = date.today()
current_date = today.strftime("%Y%m%d")
def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))
    (p | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query="SELECT DISTINCT(IF(LENGTH(MOBILE)=10, CONCAT('91',MOBILE),REPLACE(MOBILE,'+91 ','91'))) FROM `whr-asia-datalake-nonprod.WHR_DATALAKE.C4C_CONSUMER_RAW` WHERE REGEXP_CONTAINS(REGEXP_REPLACE(Mobile, ' ', ''),r'^(?:(?:\+|0{0,2})91(\s*[\-]\s*)?|[0]?)?[6789]\d{9}$')",use_standard_sql=True))
       | 'read values' >> beam.Map(lambda x: x.values())
       | 'CSV format' >> beam.Map(lambda row:'|'.join ("WRPOOL|5667788|"+ str(column) +'|"'+"Hi, This msg is from Whirlpool DL" + '"' for column in row))
       | 'Write_to_GCS' >> beam.io.WriteToText('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/WHR_MOBILE_CNSNT_REQ'+''+ str(current_date),file_name_suffix='.csv',header='SENDER_ID|SHORTCODE|MOBILE_NUM|CONSENT_MSG')
    p.run().wait_until_finish()
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

I need to create an emoty file after csv file is created. Not sure which option to use. Can some one help??

Please help. Its urgent. 

I have tried beam.Create('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/Valuefirst.done') to create empty file.
Doesn't work. 


Re: How to create an emty file using apche beam

Posted by Robert Bradshaw <ro...@google.com>.
You could write a DoFn that "consumes" the output of write as a side input
and touches the file manually. E.g.

    write_result = ... | beam.io.WriteToText(...)
    p | beam.Create([None]) | beam.Map(lambda unused_none, unused_side:
create_file(), unused_side=write_result)

where create_file() actually creates the file in question. Though
write_result is unused, this will cause the Map to block until it is
computed.


On Wed, Feb 3, 2021 at 9:15 AM radhika sharma <ra...@gmail.com> wrote:

> I have created a data flow template as below
>
> from __future__ import absolute_import
> import apache_beam as beam
> import argparse
> import logging
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.io.gcp.internal.clients import bigquery
> from datetime import date
> today = date.today()
> current_date = today.strftime("%Y%m%d")
> def run(argv=None):
>     parser = argparse.ArgumentParser()
>     known_args, pipeline_args = parser.parse_known_args(argv)
>     p = beam.Pipeline(options=PipelineOptions(pipeline_args))
>     (p | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query="SELECT
> DISTINCT(IF(LENGTH(MOBILE)=10, CONCAT('91',MOBILE),REPLACE(MOBILE,'+91
> ','91'))) FROM `whr-asia-datalake-nonprod.WHR_DATALAKE.C4C_CONSUMER_RAW`
> WHERE REGEXP_CONTAINS(REGEXP_REPLACE(Mobile, ' ',
> ''),r'^(?:(?:\+|0{0,2})91(\s*[\-]\s*)?|[0]?)?[6789]\d{9}$')",use_standard_sql=True))
>        | 'read values' >> beam.Map(lambda x: x.values())
>        | 'CSV format' >> beam.Map(lambda row:'|'.join ("WRPOOL|5667788|"+
> str(column) +'|"'+"Hi, This msg is from Whirlpool DL" + '"' for column in
> row))
>        | 'Write_to_GCS' >>
> beam.io.WriteToText('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/WHR_MOBILE_CNSNT_REQ'+''+
> str(current_date),file_name_suffix='.csv',header='SENDER_ID|SHORTCODE|MOBILE_NUM|CONSENT_MSG')
>     p.run().wait_until_finish()
> if __name__ == '__main__':
>     logging.getLogger().setLevel(logging.INFO)
>     run()
>
> I need to create an emoty file after csv file is created. Not sure which
> option to use. Can some one help??
>
> Please help. Its urgent.
>
> I have tried
> beam.Create('gs://whr-asia-datalake-dev-standard/outbound/Valuefirst/Valuefirst.done')
> to create empty file.
> Doesn't work.
>
>