You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/10/28 17:11:01 UTC
[jira] [Updated] (BEAM-10268) AssertionError('Missing boto3
requirement')
[ https://issues.apache.org/jira/browse/BEAM-10268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Beam JIRA Bot updated BEAM-10268:
---------------------------------
Labels: newbie stale-P2 (was: newbie)
> AssertionError('Missing boto3 requirement')
> -------------------------------------------
>
> Key: BEAM-10268
> URL: https://issues.apache.org/jira/browse/BEAM-10268
> Project: Beam
> Issue Type: Bug
> Components: beam-community
> Reporter: Nikhil
> Priority: P2
> Labels: newbie, stale-P2
>
> *Error as:*
> apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions \{'s3://xxxxxx.csv': BeamIOError("exists() operation failed with exceptions {'s3://xxxxxx.csv': AssertionError('Missing boto3 requirement')}")}
>
> *Agenda:*
> I am writing a dataflow pipeline in python to import data from s3 bucket. I am running this code as dataflow runner.
> Could you please guide where I am going wrong?
>
> *I have written code as:*
> *class CustomPipelineOptions(PipelineOptions):*
> @classmethod
> def _add_argparse_args(cls, parser):
> parser.add_value_provider_argument('--input', help='Path of the file to read from')
> parser.add_value_provider_argument('--output',help='Output file to write results to.')
> parser.add_value_provider_argument('--source_bucket',help='AWS source bucket name.')
> parser.add_value_provider_argument('--aws_access_key_id',help='AWS access key id.')
> parser.add_value_provider_argument('--aws_secret_access_key',help='Your AWS secret access key.')
> parser.add_value_provider_argument('--aws_default_region',help='Sets STS endpoint resolution logic.')
>
> *if __name__ == '__main__':*
> parser = argparse.ArgumentParser()
> # Parse arguments from the command line.
> pipeline_args = parser.parse_known_args(argv)
> pipeline_options = PipelineOptions(pipeline_args)
> pipeline_options.view_as(SetupOptions).save_main_session = True
> p = beam.Pipeline(options=pipeline_options)
>
> p1 = (p | 'Read assetdb' >> beam.io.ReadFromText(custom_options.input))
> p.run().wait_until_finish()
--
This message was sent by Atlassian Jira
(v8.3.4#803005)