You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Yu Watanabe <yu...@gmail.com> on 2019/09/25 22:44:10 UTC
How do you call user defined ParDo class from the pipeline in
Portable Runner (Apache Flink) ?
Hello.
I would like to ask question for ParDo .
I am getting below error inside TaskManager when running code on Apache
Flink using Portable Runner.
=====================================================
....
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1078, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
line 265, in loads
return dill.loads(s)
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, in
loads
return load(file, ignore)
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, in
load
obj = pik.load()
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, in
find_class
return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
'apache_beam.runners.worker.sdk_worker_main' from
'/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
=====================================================
" FlattenTagFilesFn" is defined as ParDo and called from Pipeline as below.
=====================================================
frames, counts = ({'pre': pcollPre, 'post': pcollPost}
| 'combined:cogroup' >> beam.CoGroupByKey()
| 'combined:exclude' >> beam.Filter(lambda x:
(len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
| 'combined:flat' >>
beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
.with_outputs('counts',
main='frames'))
=====================================================
In the same file I have defined the class as below.
=====================================================
class FlattenTagFilesFn(beam.DoFn):
def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
self.s3Bucket = s3Bucket
self.s3Creds = s3Creds
self.maxKeys = maxKeys
=====================================================
This is not a problem when running pipeline using DirectRunner.
May I ask , how should I import class for ParDo when running on Flink ?
Thanks,
Yu Watanabe
--
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
Twitter icon] <https://twitter.com/yuwtennis>
Re: How do you call user defined ParDo class from the pipeline in
Portable Runner (Apache Flink) ?
Posted by Yu Watanabe <yu...@gmail.com>.
Actually there was a good example in the latest wordcount.py in master repo.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py
On Thu, Sep 26, 2019 at 12:00 PM Yu Watanabe <yu...@gmail.com> wrote:
> Thank you for the help.
>
> I have chosen to remove the super().__init__() .
>
> Thanks,
> Yu
>
> On Thu, Sep 26, 2019 at 9:18 AM Ankur Goenka <go...@google.com> wrote:
>
>> super has some issues wile pickling in python3. Please refer
>> https://github.com/uqfoundation/dill/issues/300 for more details.
>>
>> Removing reference to super in your dofn should help.
>>
>> On Wed, Sep 25, 2019 at 5:13 PM Yu Watanabe <yu...@gmail.com>
>> wrote:
>>
>>> Thank you for the reply.
>>>
>>> " save_main_session" did not work, however, situation had changed.
>>>
>>> 1. get_all_options() output. "save_main_session" set to True.
>>>
>>> =================================================================================
>>> 2019-09-26 09:04:11,586 DEBUG Pipeline Options:
>>> {'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform':
>>> None, 'dataflow_endpoint': 'https://dataflow.googleapis.com',
>>> 'environment_config': 'asia.gcr.io/creationline001/beam/python3:latest',
>>> 'machine_type': None, 'enable_streaming_engine': False, 'sdk_location':
>>> 'default', 'profile_memory': False, 'max_num_workers': None,
>>> 'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False,
>>> 'setup_file': None, 'network': None, 'on_success_matcher': None,
>>> 'requirements_cache': None, 'service_account_email': None,
>>> 'environment_type': 'DOCKER', 'disk_type': None, 'labels': None,
>>> 'profile_location': None, 'direct_runner_use_stacked_bundle': True,
>>> 'use_public_ips': None, ***** 'save_main_session': True, *******
>>> 'direct_num_workers': 1, 'num_workers': None,
>>> 'worker_harness_container_image': None, 'template_location': None,
>>> 'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False,
>>> 'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner':
>>> 'PortableRunner', 'project': None, 'dataflow_kms_key': None,
>>> 'job_endpoint': 'localhost:8099', 'extra_packages': None,
>>> 'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm':
>>> None, 'staging_location': None, 'job_name': None, 'no_auth': False,
>>> 'runtime_type_check': False, 'direct_runner_bundle_repeat': 0,
>>> 'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None,
>>> 'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism':
>>> 0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None,
>>> 'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file':
>>> None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None}
>>>
>>> =================================================================================
>>>
>>> 2. Error in Task Manager log did not change.
>>>
>>> ==================================================================================
>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
>>> in find_class
>>> return StockUnpickler.find_class(self, module, name)
>>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
>>> 'apache_beam.runners.worker.sdk_worker_main' from
>>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
>>>
>>> ==================================================================================
>>>
>>> 3. However, if I comment out "super().__init__()" in my code , error
>>> changes.
>>>
>>> ==================================================================================
>>> File
>>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1078, in _create_pardo_operation
>>> dofn_data = pickler.loads(serialized_fn)
>>> File
>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>> line 265, in loads
>>> return dill.loads(s)
>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317,
>>> in loads
>>> return load(file, ignore)
>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305,
>>> in load
>>> obj = pik.load()
>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
>>> in find_class
>>> return StockUnpickler.find_class(self, module, name)
>>> ImportError: No module named 's3_credentials'
>>> ==================================================================================
>>>
>>>
>>> 4. My whole class is below.
>>>
>>> ==================================================================================
>>> class FlattenTagFilesFn(beam.DoFn):
>>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>>> self.s3Bucket = s3Bucket
>>> self.s3Creds = s3Creds
>>> self.maxKeys = maxKeys
>>>
>>> super().__init__()
>>>
>>> def process(self, elem):
>>>
>>> if not hasattr(self, 's3Client'):
>>> import boto3
>>> self.s3Client = boto3.client('s3',
>>>
>>> aws_access_key_id=self.s3Creds.awsAccessKeyId,
>>>
>>> aws_secret_access_key=self.s3Creds.awsSecretAccessKey)
>>>
>>> (key, info) = elem
>>>
>>> preFrm = {}
>>> resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
>>> Key=info['pre'][0][0])
>>> yaml1 = yaml.load(resp1['Body'])
>>>
>>> for elem in yaml1['body']:
>>> preFrm[ elem['frame_tag']['frame_no'] ] = elem
>>>
>>> postFrm = {}
>>> resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
>>> Key=info['post'][0][0])
>>> yaml2 = yaml.load(resp2['Body'])
>>>
>>> for elem in yaml2['body']:
>>> postFrm[ elem['frame_tag']['frame_no'] ] = elem
>>>
>>> commonFrmNums =
>>> set(list(preFrm.keys())).intersection(list(postFrm.keys()))
>>>
>>> for f in commonFrmNums:
>>> frames = Frames(
>>> self.s3Bucket,
>>> info['pre'][0][0], # Pre S3Key
>>> info['post'][0][0], # Post S3Key
>>> yaml1['head']['operator_id'], # Pre OperatorId
>>> yaml2['head']['operator_id'], # Post OperatorId
>>> preFrm[f], # Pre Frame Line
>>> postFrm[f], # Post Frame Line
>>> info['pre'][0][1], # Pre Last
>>> Modified Time
>>> info['post'][0][1]) # Post Last
>>> Modified Time
>>>
>>> yield (frames)
>>>
>>> tagCounts = TagCounts(
>>> self.s3Bucket,
>>> yaml1, # Pre Yaml
>>> yaml2, # Post Yaml
>>> info['pre'][0][0], # Pre S3Key
>>> info['post'][0][0], # Post S3Key
>>> info['pre'][0][1], # Pre Last Modified Time
>>> info['post'][0][1] ) # Post Last Modified Time
>>>
>>> yield beam.pvalue.TaggedOutput('counts', tagCounts)
>>>
>>> ==================================================================================
>>>
>>> I was using super() to define single instance of boto instance in ParDo.
>>> May I ask, is there a way to call super() in the constructor of ParDo ?
>>>
>>> Thanks,
>>> Yu
>>>
>>>
>>> On Thu, Sep 26, 2019 at 7:49 AM Kyle Weaver <kc...@google.com> wrote:
>>>
>>>> You will need to set the save_main_session pipeline option to True.
>>>>
>>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>>> kcweaver@google.com
>>>>
>>>>
>>>> On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello.
>>>>>
>>>>> I would like to ask question for ParDo .
>>>>>
>>>>> I am getting below error inside TaskManager when running code on
>>>>> Apache Flink using Portable Runner.
>>>>> =====================================================
>>>>> ....
>>>>> File
>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 1078, in _create_pardo_operation
>>>>> dofn_data = pickler.loads(serialized_fn)
>>>>> File
>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>>>> line 265, in loads
>>>>> return dill.loads(s)
>>>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line
>>>>> 317, in loads
>>>>> return load(file, ignore)
>>>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line
>>>>> 305, in load
>>>>> obj = pik.load()
>>>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line
>>>>> 474, in find_class
>>>>> return StockUnpickler.find_class(self, module, name)
>>>>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
>>>>> 'apache_beam.runners.worker.sdk_worker_main' from
>>>>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
>>>>> =====================================================
>>>>>
>>>>> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as
>>>>> below.
>>>>> =====================================================
>>>>> frames, counts = ({'pre': pcollPre, 'post': pcollPost}
>>>>> | 'combined:cogroup' >> beam.CoGroupByKey()
>>>>> | 'combined:exclude' >> beam.Filter(lambda x:
>>>>> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
>>>>> | 'combined:flat' >>
>>>>> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
>>>>> .with_outputs('counts',
>>>>> main='frames'))
>>>>> =====================================================
>>>>>
>>>>> In the same file I have defined the class as below.
>>>>> =====================================================
>>>>> class FlattenTagFilesFn(beam.DoFn):
>>>>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>>>>> self.s3Bucket = s3Bucket
>>>>> self.s3Creds = s3Creds
>>>>> self.maxKeys = maxKeys
>>>>> =====================================================
>>>>>
>>>>> This is not a problem when running pipeline using DirectRunner.
>>>>> May I ask , how should I import class for ParDo when running on Flink ?
>>>>>
>>>>> Thanks,
>>>>> Yu Watanabe
>>>>>
>>>>> --
>>>>> Yu Watanabe
>>>>> Weekend Freelancer who loves to challenge building data platform
>>>>> yu.w.tennis@gmail.com
>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
>>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>>
>>>>
>>>
>>> --
>>> Yu Watanabe
>>> Weekend Freelancer who loves to challenge building data platform
>>> yu.w.tennis@gmail.com
>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>
>>
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.tennis@gmail.com
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
> Twitter icon] <https://twitter.com/yuwtennis>
>
--
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
Twitter icon] <https://twitter.com/yuwtennis>
Re: How do you call user defined ParDo class from the pipeline in
Portable Runner (Apache Flink) ?
Posted by Yu Watanabe <yu...@gmail.com>.
Thank you for the help.
I have chosen to remove the super().__init__() .
Thanks,
Yu
On Thu, Sep 26, 2019 at 9:18 AM Ankur Goenka <go...@google.com> wrote:
> super has some issues wile pickling in python3. Please refer
> https://github.com/uqfoundation/dill/issues/300 for more details.
>
> Removing reference to super in your dofn should help.
>
> On Wed, Sep 25, 2019 at 5:13 PM Yu Watanabe <yu...@gmail.com> wrote:
>
>> Thank you for the reply.
>>
>> " save_main_session" did not work, however, situation had changed.
>>
>> 1. get_all_options() output. "save_main_session" set to True.
>>
>> =================================================================================
>> 2019-09-26 09:04:11,586 DEBUG Pipeline Options:
>> {'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform':
>> None, 'dataflow_endpoint': 'https://dataflow.googleapis.com',
>> 'environment_config': 'asia.gcr.io/creationline001/beam/python3:latest',
>> 'machine_type': None, 'enable_streaming_engine': False, 'sdk_location':
>> 'default', 'profile_memory': False, 'max_num_workers': None,
>> 'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False,
>> 'setup_file': None, 'network': None, 'on_success_matcher': None,
>> 'requirements_cache': None, 'service_account_email': None,
>> 'environment_type': 'DOCKER', 'disk_type': None, 'labels': None,
>> 'profile_location': None, 'direct_runner_use_stacked_bundle': True,
>> 'use_public_ips': None, ***** 'save_main_session': True, *******
>> 'direct_num_workers': 1, 'num_workers': None,
>> 'worker_harness_container_image': None, 'template_location': None,
>> 'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False,
>> 'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner':
>> 'PortableRunner', 'project': None, 'dataflow_kms_key': None,
>> 'job_endpoint': 'localhost:8099', 'extra_packages': None,
>> 'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm':
>> None, 'staging_location': None, 'job_name': None, 'no_auth': False,
>> 'runtime_type_check': False, 'direct_runner_bundle_repeat': 0,
>> 'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None,
>> 'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism':
>> 0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None,
>> 'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file':
>> None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None}
>>
>> =================================================================================
>>
>> 2. Error in Task Manager log did not change.
>>
>> ==================================================================================
>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
>> in find_class
>> return StockUnpickler.find_class(self, module, name)
>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
>> 'apache_beam.runners.worker.sdk_worker_main' from
>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
>>
>> ==================================================================================
>>
>> 3. However, if I comment out "super().__init__()" in my code , error
>> changes.
>>
>> ==================================================================================
>> File
>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1078, in _create_pardo_operation
>> dofn_data = pickler.loads(serialized_fn)
>> File
>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>> line 265, in loads
>> return dill.loads(s)
>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317,
>> in loads
>> return load(file, ignore)
>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305,
>> in load
>> obj = pik.load()
>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
>> in find_class
>> return StockUnpickler.find_class(self, module, name)
>> ImportError: No module named 's3_credentials'
>> ==================================================================================
>>
>>
>> 4. My whole class is below.
>>
>> ==================================================================================
>> class FlattenTagFilesFn(beam.DoFn):
>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>> self.s3Bucket = s3Bucket
>> self.s3Creds = s3Creds
>> self.maxKeys = maxKeys
>>
>> super().__init__()
>>
>> def process(self, elem):
>>
>> if not hasattr(self, 's3Client'):
>> import boto3
>> self.s3Client = boto3.client('s3',
>>
>> aws_access_key_id=self.s3Creds.awsAccessKeyId,
>>
>> aws_secret_access_key=self.s3Creds.awsSecretAccessKey)
>>
>> (key, info) = elem
>>
>> preFrm = {}
>> resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
>> Key=info['pre'][0][0])
>> yaml1 = yaml.load(resp1['Body'])
>>
>> for elem in yaml1['body']:
>> preFrm[ elem['frame_tag']['frame_no'] ] = elem
>>
>> postFrm = {}
>> resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
>> Key=info['post'][0][0])
>> yaml2 = yaml.load(resp2['Body'])
>>
>> for elem in yaml2['body']:
>> postFrm[ elem['frame_tag']['frame_no'] ] = elem
>>
>> commonFrmNums =
>> set(list(preFrm.keys())).intersection(list(postFrm.keys()))
>>
>> for f in commonFrmNums:
>> frames = Frames(
>> self.s3Bucket,
>> info['pre'][0][0], # Pre S3Key
>> info['post'][0][0], # Post S3Key
>> yaml1['head']['operator_id'], # Pre OperatorId
>> yaml2['head']['operator_id'], # Post OperatorId
>> preFrm[f], # Pre Frame Line
>> postFrm[f], # Post Frame Line
>> info['pre'][0][1], # Pre Last
>> Modified Time
>> info['post'][0][1]) # Post Last
>> Modified Time
>>
>> yield (frames)
>>
>> tagCounts = TagCounts(
>> self.s3Bucket,
>> yaml1, # Pre Yaml
>> yaml2, # Post Yaml
>> info['pre'][0][0], # Pre S3Key
>> info['post'][0][0], # Post S3Key
>> info['pre'][0][1], # Pre Last Modified Time
>> info['post'][0][1] ) # Post Last Modified Time
>>
>> yield beam.pvalue.TaggedOutput('counts', tagCounts)
>>
>> ==================================================================================
>>
>> I was using super() to define single instance of boto instance in ParDo.
>> May I ask, is there a way to call super() in the constructor of ParDo ?
>>
>> Thanks,
>> Yu
>>
>>
>> On Thu, Sep 26, 2019 at 7:49 AM Kyle Weaver <kc...@google.com> wrote:
>>
>>> You will need to set the save_main_session pipeline option to True.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>>>
>>>
>>> On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe <yu...@gmail.com>
>>> wrote:
>>>
>>>> Hello.
>>>>
>>>> I would like to ask question for ParDo .
>>>>
>>>> I am getting below error inside TaskManager when running code on Apache
>>>> Flink using Portable Runner.
>>>> =====================================================
>>>> ....
>>>> File
>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 1078, in _create_pardo_operation
>>>> dofn_data = pickler.loads(serialized_fn)
>>>> File
>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>>> line 265, in loads
>>>> return dill.loads(s)
>>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line
>>>> 317, in loads
>>>> return load(file, ignore)
>>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line
>>>> 305, in load
>>>> obj = pik.load()
>>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line
>>>> 474, in find_class
>>>> return StockUnpickler.find_class(self, module, name)
>>>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
>>>> 'apache_beam.runners.worker.sdk_worker_main' from
>>>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
>>>> =====================================================
>>>>
>>>> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as
>>>> below.
>>>> =====================================================
>>>> frames, counts = ({'pre': pcollPre, 'post': pcollPost}
>>>> | 'combined:cogroup' >> beam.CoGroupByKey()
>>>> | 'combined:exclude' >> beam.Filter(lambda x:
>>>> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
>>>> | 'combined:flat' >>
>>>> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
>>>> .with_outputs('counts',
>>>> main='frames'))
>>>> =====================================================
>>>>
>>>> In the same file I have defined the class as below.
>>>> =====================================================
>>>> class FlattenTagFilesFn(beam.DoFn):
>>>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>>>> self.s3Bucket = s3Bucket
>>>> self.s3Creds = s3Creds
>>>> self.maxKeys = maxKeys
>>>> =====================================================
>>>>
>>>> This is not a problem when running pipeline using DirectRunner.
>>>> May I ask , how should I import class for ParDo when running on Flink ?
>>>>
>>>> Thanks,
>>>> Yu Watanabe
>>>>
>>>> --
>>>> Yu Watanabe
>>>> Weekend Freelancer who loves to challenge building data platform
>>>> yu.w.tennis@gmail.com
>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
>>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>>
>>>
>>
>> --
>> Yu Watanabe
>> Weekend Freelancer who loves to challenge building data platform
>> yu.w.tennis@gmail.com
>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
>> Twitter icon] <https://twitter.com/yuwtennis>
>>
>
--
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
Twitter icon] <https://twitter.com/yuwtennis>
Re: How do you call user defined ParDo class from the pipeline in
Portable Runner (Apache Flink) ?
Posted by Ankur Goenka <go...@google.com>.
super has some issues wile pickling in python3. Please refer
https://github.com/uqfoundation/dill/issues/300 for more details.
Removing reference to super in your dofn should help.
On Wed, Sep 25, 2019 at 5:13 PM Yu Watanabe <yu...@gmail.com> wrote:
> Thank you for the reply.
>
> " save_main_session" did not work, however, situation had changed.
>
> 1. get_all_options() output. "save_main_session" set to True.
>
> =================================================================================
> 2019-09-26 09:04:11,586 DEBUG Pipeline Options:
> {'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform':
> None, 'dataflow_endpoint': 'https://dataflow.googleapis.com',
> 'environment_config': 'asia.gcr.io/creationline001/beam/python3:latest',
> 'machine_type': None, 'enable_streaming_engine': False, 'sdk_location':
> 'default', 'profile_memory': False, 'max_num_workers': None,
> 'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False,
> 'setup_file': None, 'network': None, 'on_success_matcher': None,
> 'requirements_cache': None, 'service_account_email': None,
> 'environment_type': 'DOCKER', 'disk_type': None, 'labels': None,
> 'profile_location': None, 'direct_runner_use_stacked_bundle': True,
> 'use_public_ips': None, ***** 'save_main_session': True, *******
> 'direct_num_workers': 1, 'num_workers': None,
> 'worker_harness_container_image': None, 'template_location': None,
> 'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False,
> 'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner':
> 'PortableRunner', 'project': None, 'dataflow_kms_key': None,
> 'job_endpoint': 'localhost:8099', 'extra_packages': None,
> 'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm':
> None, 'staging_location': None, 'job_name': None, 'no_auth': False,
> 'runtime_type_check': False, 'direct_runner_bundle_repeat': 0,
> 'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None,
> 'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism':
> 0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None,
> 'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file':
> None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None}
>
> =================================================================================
>
> 2. Error in Task Manager log did not change.
>
> ==================================================================================
> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
> in find_class
> return StockUnpickler.find_class(self, module, name)
> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
> 'apache_beam.runners.worker.sdk_worker_main' from
> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
>
> ==================================================================================
>
> 3. However, if I comment out "super().__init__()" in my code , error
> changes.
>
> ==================================================================================
> File
> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1078, in _create_pardo_operation
> dofn_data = pickler.loads(serialized_fn)
> File
> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
> line 265, in loads
> return dill.loads(s)
> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317,
> in loads
> return load(file, ignore)
> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305,
> in load
> obj = pik.load()
> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
> in find_class
> return StockUnpickler.find_class(self, module, name)
> ImportError: No module named 's3_credentials'
> ==================================================================================
>
>
> 4. My whole class is below.
>
> ==================================================================================
> class FlattenTagFilesFn(beam.DoFn):
> def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
> self.s3Bucket = s3Bucket
> self.s3Creds = s3Creds
> self.maxKeys = maxKeys
>
> super().__init__()
>
> def process(self, elem):
>
> if not hasattr(self, 's3Client'):
> import boto3
> self.s3Client = boto3.client('s3',
>
> aws_access_key_id=self.s3Creds.awsAccessKeyId,
>
> aws_secret_access_key=self.s3Creds.awsSecretAccessKey)
>
> (key, info) = elem
>
> preFrm = {}
> resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
> Key=info['pre'][0][0])
> yaml1 = yaml.load(resp1['Body'])
>
> for elem in yaml1['body']:
> preFrm[ elem['frame_tag']['frame_no'] ] = elem
>
> postFrm = {}
> resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
> Key=info['post'][0][0])
> yaml2 = yaml.load(resp2['Body'])
>
> for elem in yaml2['body']:
> postFrm[ elem['frame_tag']['frame_no'] ] = elem
>
> commonFrmNums =
> set(list(preFrm.keys())).intersection(list(postFrm.keys()))
>
> for f in commonFrmNums:
> frames = Frames(
> self.s3Bucket,
> info['pre'][0][0], # Pre S3Key
> info['post'][0][0], # Post S3Key
> yaml1['head']['operator_id'], # Pre OperatorId
> yaml2['head']['operator_id'], # Post OperatorId
> preFrm[f], # Pre Frame Line
> postFrm[f], # Post Frame Line
> info['pre'][0][1], # Pre Last
> Modified Time
> info['post'][0][1]) # Post Last
> Modified Time
>
> yield (frames)
>
> tagCounts = TagCounts(
> self.s3Bucket,
> yaml1, # Pre Yaml
> yaml2, # Post Yaml
> info['pre'][0][0], # Pre S3Key
> info['post'][0][0], # Post S3Key
> info['pre'][0][1], # Pre Last Modified Time
> info['post'][0][1] ) # Post Last Modified Time
>
> yield beam.pvalue.TaggedOutput('counts', tagCounts)
>
> ==================================================================================
>
> I was using super() to define single instance of boto instance in ParDo.
> May I ask, is there a way to call super() in the constructor of ParDo ?
>
> Thanks,
> Yu
>
>
> On Thu, Sep 26, 2019 at 7:49 AM Kyle Weaver <kc...@google.com> wrote:
>
>> You will need to set the save_main_session pipeline option to True.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>>
>>
>> On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe <yu...@gmail.com>
>> wrote:
>>
>>> Hello.
>>>
>>> I would like to ask question for ParDo .
>>>
>>> I am getting below error inside TaskManager when running code on Apache
>>> Flink using Portable Runner.
>>> =====================================================
>>> ....
>>> File
>>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1078, in _create_pardo_operation
>>> dofn_data = pickler.loads(serialized_fn)
>>> File
>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>> line 265, in loads
>>> return dill.loads(s)
>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317,
>>> in loads
>>> return load(file, ignore)
>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305,
>>> in load
>>> obj = pik.load()
>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
>>> in find_class
>>> return StockUnpickler.find_class(self, module, name)
>>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
>>> 'apache_beam.runners.worker.sdk_worker_main' from
>>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
>>> =====================================================
>>>
>>> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as
>>> below.
>>> =====================================================
>>> frames, counts = ({'pre': pcollPre, 'post': pcollPost}
>>> | 'combined:cogroup' >> beam.CoGroupByKey()
>>> | 'combined:exclude' >> beam.Filter(lambda x:
>>> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
>>> | 'combined:flat' >>
>>> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
>>> .with_outputs('counts',
>>> main='frames'))
>>> =====================================================
>>>
>>> In the same file I have defined the class as below.
>>> =====================================================
>>> class FlattenTagFilesFn(beam.DoFn):
>>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>>> self.s3Bucket = s3Bucket
>>> self.s3Creds = s3Creds
>>> self.maxKeys = maxKeys
>>> =====================================================
>>>
>>> This is not a problem when running pipeline using DirectRunner.
>>> May I ask , how should I import class for ParDo when running on Flink ?
>>>
>>> Thanks,
>>> Yu Watanabe
>>>
>>> --
>>> Yu Watanabe
>>> Weekend Freelancer who loves to challenge building data platform
>>> yu.w.tennis@gmail.com
>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>
>>
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.tennis@gmail.com
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
> Twitter icon] <https://twitter.com/yuwtennis>
>
Re: How do you call user defined ParDo class from the pipeline in
Portable Runner (Apache Flink) ?
Posted by Yu Watanabe <yu...@gmail.com>.
Thank you for the reply.
" save_main_session" did not work, however, situation had changed.
1. get_all_options() output. "save_main_session" set to True.
=================================================================================
2019-09-26 09:04:11,586 DEBUG Pipeline Options:
{'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform':
None, 'dataflow_endpoint': 'https://dataflow.googleapis.com',
'environment_config': 'asia.gcr.io/creationline001/beam/python3:latest',
'machine_type': None, 'enable_streaming_engine': False, 'sdk_location':
'default', 'profile_memory': False, 'max_num_workers': None,
'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False,
'setup_file': None, 'network': None, 'on_success_matcher': None,
'requirements_cache': None, 'service_account_email': None,
'environment_type': 'DOCKER', 'disk_type': None, 'labels': None,
'profile_location': None, 'direct_runner_use_stacked_bundle': True,
'use_public_ips': None, ***** 'save_main_session': True, *******
'direct_num_workers': 1, 'num_workers': None,
'worker_harness_container_image': None, 'template_location': None,
'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False,
'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner':
'PortableRunner', 'project': None, 'dataflow_kms_key': None,
'job_endpoint': 'localhost:8099', 'extra_packages': None,
'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm':
None, 'staging_location': None, 'job_name': None, 'no_auth': False,
'runtime_type_check': False, 'direct_runner_bundle_repeat': 0,
'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None,
'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism':
0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None,
'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file':
None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None}
=================================================================================
2. Error in Task Manager log did not change.
==================================================================================
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, in
find_class
return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
'apache_beam.runners.worker.sdk_worker_main' from
'/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
==================================================================================
3. However, if I comment out "super().__init__()" in my code , error
changes.
==================================================================================
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1078, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
line 265, in loads
return dill.loads(s)
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, in
loads
return load(file, ignore)
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, in
load
obj = pik.load()
File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, in
find_class
return StockUnpickler.find_class(self, module, name)
ImportError: No module named 's3_credentials'
==================================================================================
4. My whole class is below.
==================================================================================
class FlattenTagFilesFn(beam.DoFn):
def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
self.s3Bucket = s3Bucket
self.s3Creds = s3Creds
self.maxKeys = maxKeys
super().__init__()
def process(self, elem):
if not hasattr(self, 's3Client'):
import boto3
self.s3Client = boto3.client('s3',
aws_access_key_id=self.s3Creds.awsAccessKeyId,
aws_secret_access_key=self.s3Creds.awsSecretAccessKey)
(key, info) = elem
preFrm = {}
resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
Key=info['pre'][0][0])
yaml1 = yaml.load(resp1['Body'])
for elem in yaml1['body']:
preFrm[ elem['frame_tag']['frame_no'] ] = elem
postFrm = {}
resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
Key=info['post'][0][0])
yaml2 = yaml.load(resp2['Body'])
for elem in yaml2['body']:
postFrm[ elem['frame_tag']['frame_no'] ] = elem
commonFrmNums =
set(list(preFrm.keys())).intersection(list(postFrm.keys()))
for f in commonFrmNums:
frames = Frames(
self.s3Bucket,
info['pre'][0][0], # Pre S3Key
info['post'][0][0], # Post S3Key
yaml1['head']['operator_id'], # Pre OperatorId
yaml2['head']['operator_id'], # Post OperatorId
preFrm[f], # Pre Frame Line
postFrm[f], # Post Frame Line
info['pre'][0][1], # Pre Last Modified
Time
info['post'][0][1]) # Post Last
Modified Time
yield (frames)
tagCounts = TagCounts(
self.s3Bucket,
yaml1, # Pre Yaml
yaml2, # Post Yaml
info['pre'][0][0], # Pre S3Key
info['post'][0][0], # Post S3Key
info['pre'][0][1], # Pre Last Modified Time
info['post'][0][1] ) # Post Last Modified Time
yield beam.pvalue.TaggedOutput('counts', tagCounts)
==================================================================================
I was using super() to define single instance of boto instance in ParDo.
May I ask, is there a way to call super() in the constructor of ParDo ?
Thanks,
Yu
On Thu, Sep 26, 2019 at 7:49 AM Kyle Weaver <kc...@google.com> wrote:
> You will need to set the save_main_session pipeline option to True.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>
>
> On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe <yu...@gmail.com> wrote:
>
>> Hello.
>>
>> I would like to ask question for ParDo .
>>
>> I am getting below error inside TaskManager when running code on Apache
>> Flink using Portable Runner.
>> =====================================================
>> ....
>> File
>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1078, in _create_pardo_operation
>> dofn_data = pickler.loads(serialized_fn)
>> File
>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>> line 265, in loads
>> return dill.loads(s)
>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317,
>> in loads
>> return load(file, ignore)
>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305,
>> in load
>> obj = pik.load()
>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
>> in find_class
>> return StockUnpickler.find_class(self, module, name)
>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
>> 'apache_beam.runners.worker.sdk_worker_main' from
>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
>> =====================================================
>>
>> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as
>> below.
>> =====================================================
>> frames, counts = ({'pre': pcollPre, 'post': pcollPost}
>> | 'combined:cogroup' >> beam.CoGroupByKey()
>> | 'combined:exclude' >> beam.Filter(lambda x:
>> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
>> | 'combined:flat' >>
>> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
>> .with_outputs('counts',
>> main='frames'))
>> =====================================================
>>
>> In the same file I have defined the class as below.
>> =====================================================
>> class FlattenTagFilesFn(beam.DoFn):
>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>> self.s3Bucket = s3Bucket
>> self.s3Creds = s3Creds
>> self.maxKeys = maxKeys
>> =====================================================
>>
>> This is not a problem when running pipeline using DirectRunner.
>> May I ask , how should I import class for ParDo when running on Flink ?
>>
>> Thanks,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>> Weekend Freelancer who loves to challenge building data platform
>> yu.w.tennis@gmail.com
>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
>> Twitter icon] <https://twitter.com/yuwtennis>
>>
>
--
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
Twitter icon] <https://twitter.com/yuwtennis>
Re: How do you call user defined ParDo class from the pipeline in
Portable Runner (Apache Flink) ?
Posted by Kyle Weaver <kc...@google.com>.
You will need to set the save_main_session pipeline option to True.
Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe <yu...@gmail.com> wrote:
> Hello.
>
> I would like to ask question for ParDo .
>
> I am getting below error inside TaskManager when running code on Apache
> Flink using Portable Runner.
> =====================================================
> ....
> File
> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1078, in _create_pardo_operation
> dofn_data = pickler.loads(serialized_fn)
> File
> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
> line 265, in loads
> return dill.loads(s)
> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317,
> in loads
> return load(file, ignore)
> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305,
> in load
> obj = pik.load()
> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
> in find_class
> return StockUnpickler.find_class(self, module, name)
> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
> 'apache_beam.runners.worker.sdk_worker_main' from
> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
> =====================================================
>
> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as below.
> =====================================================
> frames, counts = ({'pre': pcollPre, 'post': pcollPost}
> | 'combined:cogroup' >> beam.CoGroupByKey()
> | 'combined:exclude' >> beam.Filter(lambda x:
> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
> | 'combined:flat' >>
> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
> .with_outputs('counts',
> main='frames'))
> =====================================================
>
> In the same file I have defined the class as below.
> =====================================================
> class FlattenTagFilesFn(beam.DoFn):
> def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
> self.s3Bucket = s3Bucket
> self.s3Creds = s3Creds
> self.maxKeys = maxKeys
> =====================================================
>
> This is not a problem when running pipeline using DirectRunner.
> May I ask , how should I import class for ParDo when running on Flink ?
>
> Thanks,
> Yu Watanabe
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.tennis@gmail.com
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
> Twitter icon] <https://twitter.com/yuwtennis>
>