You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Yu Watanabe <yu...@gmail.com> on 2019/10/07 07:49:30 UTC
Re: How to import external module inside ParDo using Apache Flink ?
Thank you for the comment.
I finally got this working. I would like to share my experience for people
whom are beginner with portable runner.
What I done was below items when calling functions and classes from
external package.
1. As Kyle said, I needed 'save_main_session' for sys path to persist after
pickling.
2. I needed to push all related files to worker nodes using "extra_package"
option to resolve dependency.
https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
3. I needed to write import syntax in clear fashion otherwise I got below
error in task manager.
Looks like external packages is pushed in to
"/usr/local/lib/python3.5/site-packages" and requires PKG.MODULENAME format
to work it out.
================================================================================================
import utils
...
File "/usr/local/lib/python3.5/site-packages/modules/beam_pardo.py", line
18, in <module>
import utils
ImportError: No module named 'utils'
================================================================================================
Below is my import syntax for external package in main.py. Other files also
follow below syntax.
================================================================================================
#
# Local application/library specific imports
#
import pkg_aif.utils as ut
from pkg_aif.beam_pardo import VerifyFn
from pkg_aif.beam_pardo import FlattenTagFilesFn
from pkg_aif.beam_states import StatefulBufferingFn
from pkg_aif.pipeline_wrapper import pipelineWrapper
from pkg_aif.frames import Frames
from pkg_aif.tag_counts import TagCounts
from pkg_aif.tags import Tags
from pkg_aif.es_credentials import EsCredentials
from pkg_aif.s3_credentials import S3Credentials
================================================================================================
Below are related information to above.
Full options for PipelineOptions.
================================================================================================
options = PipelineOptions([
"--runner=PortableRunner",
"--environment_config={0}".format(DOCKER_REGISTRY),
"--environment_type=DOCKER",
"--experiments=beam_fn_api",
"--parallelism={0}".format(PARALLELISM),
"--job_endpoint=localhost:8099",
"--extra_package=PATH_TO_SDIST"
])
options.view_as(SetupOptions).save_main_session = True
return beam.Pipeline(options=options)
================================================================================================
My setup.py is below.
================================================================================================
import setuptools
REQUIRED_PACKAGES = [
'apache-beam==2.15.0',
'elasticsearch>=7.0.0,<8.0.0',
'urllib3',
'boto3'
]
setuptools.setup(
author = 'Yu Watanabe',
author_email = 'AUTHOR_EMAIL',
url = 'URL',
name = 'quality_validation',
version = '0.1',
install_requires = REQUIRED_PACKAGES,
packages = setuptools.find_packages(),
)
================================================================================================
Directory path to setup.py.
================================================================================================
admin@ip-172-31-9-89:~/quality-validation-distribute/bin$ ls -l
total 20
drwxr-xr-x 2 admin admin 4096 Oct 2 19:30 dist
-rw-r--r-- 1 admin admin 0 Sep 5 21:21 __init__.py
-rw-r--r-- 1 admin admin 3782 Oct 3 11:02 main.py
drwxr-xr-x 3 admin admin 4096 Oct 3 15:41 pkg_aif
drwxr-xr-x 2 admin admin 4096 Oct 2 19:30 quality_validation.egg-info
-rw-r--r-- 1 admin admin 517 Oct 1 15:21 setup.py
================================================================================================
Thanks,
Yu Watanabe
On Fri, Sep 27, 2019 at 3:23 AM Kyle Weaver <kc...@google.com> wrote:
> Did you try moving the imports from the process function to the top of
> main.py?
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>
>
> On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe <yu...@gmail.com>
> wrote:
>
>> Hello.
>>
>> I would like to ask for help with resolving dependency issue for imported
>> module.
>>
>> I have a directory structure as below and I am trying to import Frames
>> class from frames.py to main.py.
>> =========================================
>> quality-validation/
>> bin/setup.py
>> main.py
>> modules/
>> frames.py
>> <TRIMMED>
>> =========================================
>>
>> However, when I run pipeline, I get below error at TaskManager.
>> =========================================
>> <TRIMMED>
>> File "apache_beam/runners/common.py", line 942, in
>> apache_beam.runners.common._OutputProcessor.process_outputs
>> File "apache_beam/runners/worker/operations.py", line 143, in
>> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>> File "apache_beam/runners/worker/operations.py", line 593, in
>> apache_beam.runners.worker.operations.DoOperation.process
>> File "apache_beam/runners/worker/operations.py", line 594, in
>> apache_beam.runners.worker.operations.DoOperation.process
>> File "apache_beam/runners/common.py", line 799, in
>> apache_beam.runners.common.DoFnRunner.receive
>> File "apache_beam/runners/common.py", line 805, in
>> apache_beam.runners.common.DoFnRunner.process
>> File "apache_beam/runners/common.py", line 872, in
>> apache_beam.runners.common.DoFnRunner._reraise_augmented
>> File "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py",
>> line 419, in raise_with_traceback
>> raise exc.with_traceback(traceback)
>> File "apache_beam/runners/common.py", line 803, in
>> apache_beam.runners.common.DoFnRunner.process
>> File "apache_beam/runners/common.py", line 465, in
>> apache_beam.runners.common.SimpleInvoker.invoke_process
>> File "apache_beam/runners/common.py", line 918, in
>> apache_beam.runners.common._OutputProcessor.process_outputs
>> File "/home/admin/quality-validation/bin/main.py", line 44, in process
>> ImportError: No module named 'frames' [while running
>> 'combined:flat/ParDo(FlattenTagFilesFn)']
>> =========================================
>>
>> I import modules at global context and also at top of the process
>> function .
>> =========================================
>> [main.py]
>> #
>> # Standard library imports
>> #
>> import logging
>> import pprint
>> import sys
>> sys.path.append("{0}/modules".format(sys.path[0]))
>> sys.path.append("{0}/modules/vendor".format(sys.path[0]))
>>
>> #
>> # Related third party imports
>> #
>> import apache_beam as beam
>>
>> #
>> # Local application/library specific imports
>> #
>> import utils
>> from pipeline_wrapper import pipelineWrapper
>> from tag_counts import TagCounts
>> from tags import Tags
>>
>> <TRIMMED>
>>
>> class FlattenTagFilesFn(beam.DoFn):
>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>> beam.DoFn.__init__(self)
>>
>> self.s3Bucket = s3Bucket
>> self.s3Creds = s3Creds
>> self.maxKeys = maxKeys
>>
>> def process(self, elem):
>> import yaml
>> from frames import Frames
>>
>> if not hasattr(self, 's3Client'):
>> import boto3
>> self.s3Client = boto3.client('s3',
>> aws_access_key_id=self.s3Creds[0],
>> aws_secret_access_key=self.s3Creds[1])
>>
>> (key, info) = elem
>>
>> preFrm = {}
>> resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
>> Key=info['pre'][0][0])
>> yaml1 = yaml.load(resp1['Body'])
>>
>> for elem in yaml1['body']:
>> preFrm[ elem['frame_tag']['frame_no'] ] = elem
>>
>> postFrm = {}
>> resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
>> Key=info['post'][0][0])
>> yaml2 = yaml.load(resp2['Body'])
>>
>> for elem in yaml2['body']:
>> postFrm[ elem['frame_tag']['frame_no'] ] = elem
>>
>> commonFrmNums =
>> set(list(preFrm.keys())).intersection(list(postFrm.keys()))
>>
>> for f in commonFrmNums:
>> frames = Frames(
>> self.s3Bucket,
>> info['pre'][0][0], # Pre S3Key
>> info['post'][0][0], # Post S3Key
>> yaml1['head']['operator_id'], # Pre OperatorId
>> yaml2['head']['operator_id'], # Post OperatorId
>> preFrm[f], # Pre Frame Line
>> postFrm[f], # Post Frame Line
>> info['pre'][0][1], # Pre Last
>> Modified Time
>> info['post'][0][1]) # Post Last
>> Modified Time
>>
>> yield (frames)
>>
>> tagCounts = TagCounts(
>> self.s3Bucket,
>> yaml1, # Pre Yaml
>> yaml2, # Post Yaml
>> info['pre'][0][0], # Pre S3Key
>> info['post'][0][0], # Post S3Key
>> info['pre'][0][1], # Pre Last Modified Time
>> info['post'][0][1] ) # Post Last Modified Time
>>
>> yield beam.pvalue.TaggedOutput('counts', tagCounts)
>> =========================================
>>
>> My pipeline options are below. I tried with and without "setup_file" but
>> made no difference.
>> =========================================
>> options = PipelineOptions([
>> "--runner=PortableRunner",
>>
>> "--environment_config={0}".format(self.__docker_registry),
>> "--environment_type=DOCKER",
>> "--experiments=beam_fn_api",
>> "--job_endpoint=localhost:8099"
>> ])
>> options.view_as(SetupOptions).save_main_session = True
>> options.view_as(SetupOptions).setup_file =
>> '/home/admin/quality-validation/bin/setup.py'
>> =========================================
>>
>> Is it possible to solve dependency in ParDo linked to external module
>> when using Apache Flink?
>>
>> Thanks,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>> Weekend Freelancer who loves to challenge building data platform
>> yu.w.tennis@gmail.com
>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
>> Twitter icon] <https://twitter.com/yuwtennis>
>>
>
--
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
Twitter icon] <https://twitter.com/yuwtennis>
Re: How to import external module inside ParDo using Apache Flink ?
Posted by Yu Watanabe <yu...@gmail.com>.
I did not answer to Kyle's question. Sorry about that.
> Did you try moving the imports from the process function to the top of
main.py?
Yes . Consequently, I moved all "imports" to at top of each files unless
I needed to exclusively import inside the function.
On Mon, Oct 7, 2019 at 4:49 PM Yu Watanabe <yu...@gmail.com> wrote:
> Thank you for the comment.
>
> I finally got this working. I would like to share my experience for people
> whom are beginner with portable runner.
> What I done was below items when calling functions and classes from
> external package.
>
> 1. As Kyle said, I needed 'save_main_session' for sys path to persist
> after pickling.
>
> 2. I needed to push all related files to worker nodes using
> "extra_package" option to resolve dependency.
> https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
>
> 3. I needed to write import syntax in clear fashion otherwise I got below
> error in task manager.
> Looks like external packages is pushed in to
> "/usr/local/lib/python3.5/site-packages" and requires PKG.MODULENAME format
> to work it out.
>
> ================================================================================================
> import utils
>
> ...
>
> File "/usr/local/lib/python3.5/site-packages/modules/beam_pardo.py",
> line 18, in <module>
> import utils
> ImportError: No module named 'utils'
>
> ================================================================================================
>
> Below is my import syntax for external package in main.py. Other files
> also follow below syntax.
>
> ================================================================================================
> #
> # Local application/library specific imports
> #
> import pkg_aif.utils as ut
> from pkg_aif.beam_pardo import VerifyFn
> from pkg_aif.beam_pardo import FlattenTagFilesFn
> from pkg_aif.beam_states import StatefulBufferingFn
> from pkg_aif.pipeline_wrapper import pipelineWrapper
> from pkg_aif.frames import Frames
> from pkg_aif.tag_counts import TagCounts
> from pkg_aif.tags import Tags
> from pkg_aif.es_credentials import EsCredentials
> from pkg_aif.s3_credentials import S3Credentials
>
> ================================================================================================
>
> Below are related information to above.
>
> Full options for PipelineOptions.
>
> ================================================================================================
> options = PipelineOptions([
> "--runner=PortableRunner",
> "--environment_config={0}".format(DOCKER_REGISTRY),
> "--environment_type=DOCKER",
> "--experiments=beam_fn_api",
> "--parallelism={0}".format(PARALLELISM),
> "--job_endpoint=localhost:8099",
> "--extra_package=PATH_TO_SDIST"
> ])
> options.view_as(SetupOptions).save_main_session = True
>
> return beam.Pipeline(options=options)
>
> ================================================================================================
>
> My setup.py is below.
>
> ================================================================================================
> import setuptools
>
> REQUIRED_PACKAGES = [
> 'apache-beam==2.15.0',
> 'elasticsearch>=7.0.0,<8.0.0',
> 'urllib3',
> 'boto3'
> ]
>
> setuptools.setup(
> author = 'Yu Watanabe',
> author_email = 'AUTHOR_EMAIL',
> url = 'URL',
> name = 'quality_validation',
> version = '0.1',
> install_requires = REQUIRED_PACKAGES,
> packages = setuptools.find_packages(),
>
> )
>
> ================================================================================================
>
> Directory path to setup.py.
>
> ================================================================================================
> admin@ip-172-31-9-89:~/quality-validation-distribute/bin$ ls -l
> total 20
> drwxr-xr-x 2 admin admin 4096 Oct 2 19:30 dist
> -rw-r--r-- 1 admin admin 0 Sep 5 21:21 __init__.py
> -rw-r--r-- 1 admin admin 3782 Oct 3 11:02 main.py
> drwxr-xr-x 3 admin admin 4096 Oct 3 15:41 pkg_aif
> drwxr-xr-x 2 admin admin 4096 Oct 2 19:30 quality_validation.egg-info
> -rw-r--r-- 1 admin admin 517 Oct 1 15:21 setup.py
>
> ================================================================================================
>
> Thanks,
> Yu Watanabe
>
> On Fri, Sep 27, 2019 at 3:23 AM Kyle Weaver <kc...@google.com> wrote:
>
>> Did you try moving the imports from the process function to the top of
>> main.py?
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>>
>>
>> On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe <yu...@gmail.com>
>> wrote:
>>
>>> Hello.
>>>
>>> I would like to ask for help with resolving dependency issue for
>>> imported module.
>>>
>>> I have a directory structure as below and I am trying to import Frames
>>> class from frames.py to main.py.
>>> =========================================
>>> quality-validation/
>>> bin/setup.py
>>> main.py
>>> modules/
>>> frames.py
>>> <TRIMMED>
>>> =========================================
>>>
>>> However, when I run pipeline, I get below error at TaskManager.
>>> =========================================
>>> <TRIMMED>
>>> File "apache_beam/runners/common.py", line 942, in
>>> apache_beam.runners.common._OutputProcessor.process_outputs
>>> File "apache_beam/runners/worker/operations.py", line 143, in
>>> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>>> File "apache_beam/runners/worker/operations.py", line 593, in
>>> apache_beam.runners.worker.operations.DoOperation.process
>>> File "apache_beam/runners/worker/operations.py", line 594, in
>>> apache_beam.runners.worker.operations.DoOperation.process
>>> File "apache_beam/runners/common.py", line 799, in
>>> apache_beam.runners.common.DoFnRunner.receive
>>> File "apache_beam/runners/common.py", line 805, in
>>> apache_beam.runners.common.DoFnRunner.process
>>> File "apache_beam/runners/common.py", line 872, in
>>> apache_beam.runners.common.DoFnRunner._reraise_augmented
>>> File
>>> "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py", line
>>> 419, in raise_with_traceback
>>> raise exc.with_traceback(traceback)
>>> File "apache_beam/runners/common.py", line 803, in
>>> apache_beam.runners.common.DoFnRunner.process
>>> File "apache_beam/runners/common.py", line 465, in
>>> apache_beam.runners.common.SimpleInvoker.invoke_process
>>> File "apache_beam/runners/common.py", line 918, in
>>> apache_beam.runners.common._OutputProcessor.process_outputs
>>> File "/home/admin/quality-validation/bin/main.py", line 44, in process
>>> ImportError: No module named 'frames' [while running
>>> 'combined:flat/ParDo(FlattenTagFilesFn)']
>>> =========================================
>>>
>>> I import modules at global context and also at top of the process
>>> function .
>>> =========================================
>>> [main.py]
>>> #
>>> # Standard library imports
>>> #
>>> import logging
>>> import pprint
>>> import sys
>>> sys.path.append("{0}/modules".format(sys.path[0]))
>>> sys.path.append("{0}/modules/vendor".format(sys.path[0]))
>>>
>>> #
>>> # Related third party imports
>>> #
>>> import apache_beam as beam
>>>
>>> #
>>> # Local application/library specific imports
>>> #
>>> import utils
>>> from pipeline_wrapper import pipelineWrapper
>>> from tag_counts import TagCounts
>>> from tags import Tags
>>>
>>> <TRIMMED>
>>>
>>> class FlattenTagFilesFn(beam.DoFn):
>>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>>> beam.DoFn.__init__(self)
>>>
>>> self.s3Bucket = s3Bucket
>>> self.s3Creds = s3Creds
>>> self.maxKeys = maxKeys
>>>
>>> def process(self, elem):
>>> import yaml
>>> from frames import Frames
>>>
>>> if not hasattr(self, 's3Client'):
>>> import boto3
>>> self.s3Client = boto3.client('s3',
>>> aws_access_key_id=self.s3Creds[0],
>>> aws_secret_access_key=self.s3Creds[1])
>>>
>>> (key, info) = elem
>>>
>>> preFrm = {}
>>> resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
>>> Key=info['pre'][0][0])
>>> yaml1 = yaml.load(resp1['Body'])
>>>
>>> for elem in yaml1['body']:
>>> preFrm[ elem['frame_tag']['frame_no'] ] = elem
>>>
>>> postFrm = {}
>>> resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
>>> Key=info['post'][0][0])
>>> yaml2 = yaml.load(resp2['Body'])
>>>
>>> for elem in yaml2['body']:
>>> postFrm[ elem['frame_tag']['frame_no'] ] = elem
>>>
>>> commonFrmNums =
>>> set(list(preFrm.keys())).intersection(list(postFrm.keys()))
>>>
>>> for f in commonFrmNums:
>>> frames = Frames(
>>> self.s3Bucket,
>>> info['pre'][0][0], # Pre S3Key
>>> info['post'][0][0], # Post S3Key
>>> yaml1['head']['operator_id'], # Pre OperatorId
>>> yaml2['head']['operator_id'], # Post OperatorId
>>> preFrm[f], # Pre Frame Line
>>> postFrm[f], # Post Frame Line
>>> info['pre'][0][1], # Pre Last
>>> Modified Time
>>> info['post'][0][1]) # Post Last
>>> Modified Time
>>>
>>> yield (frames)
>>>
>>> tagCounts = TagCounts(
>>> self.s3Bucket,
>>> yaml1, # Pre Yaml
>>> yaml2, # Post Yaml
>>> info['pre'][0][0], # Pre S3Key
>>> info['post'][0][0], # Post S3Key
>>> info['pre'][0][1], # Pre Last Modified Time
>>> info['post'][0][1] ) # Post Last Modified Time
>>>
>>> yield beam.pvalue.TaggedOutput('counts', tagCounts)
>>> =========================================
>>>
>>> My pipeline options are below. I tried with and without "setup_file" but
>>> made no difference.
>>> =========================================
>>> options = PipelineOptions([
>>> "--runner=PortableRunner",
>>>
>>> "--environment_config={0}".format(self.__docker_registry),
>>> "--environment_type=DOCKER",
>>> "--experiments=beam_fn_api",
>>> "--job_endpoint=localhost:8099"
>>> ])
>>> options.view_as(SetupOptions).save_main_session = True
>>> options.view_as(SetupOptions).setup_file =
>>> '/home/admin/quality-validation/bin/setup.py'
>>> =========================================
>>>
>>> Is it possible to solve dependency in ParDo linked to external module
>>> when using Apache Flink?
>>>
>>> Thanks,
>>> Yu Watanabe
>>>
>>> --
>>> Yu Watanabe
>>> Weekend Freelancer who loves to challenge building data platform
>>> yu.w.tennis@gmail.com
>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>
>>
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.tennis@gmail.com
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
> Twitter icon] <https://twitter.com/yuwtennis>
>
--
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image:
Twitter icon] <https://twitter.com/yuwtennis>