You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Yu Watanabe <yu...@gmail.com> on 2019/09/26 06:27:25 UTC

How to import external module inside ParDo using Apache Flink ?

Hello.

I would like to ask for help with resolving dependency issue for imported
module.

I have a directory structure as below and I am trying to import Frames
class from frames.py to main.py.
=========================================
quality-validation/
    bin/setup.py
          main.py
          modules/
            frames.py
           <TRIMMED>
=========================================

However, when I run pipeline, I get below error at TaskManager.
=========================================
<TRIMMED>
  File "apache_beam/runners/common.py", line 942, in
apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 143, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 799, in
apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 805, in
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 872, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py",
line 419, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 803, in
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 465, in
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 918, in
apache_beam.runners.common._OutputProcessor.process_outputs
  File "/home/admin/quality-validation/bin/main.py", line 44, in process
ImportError: No module named 'frames' [while running
'combined:flat/ParDo(FlattenTagFilesFn)']
=========================================

I  import modules at global context  and also at top of the process
function .
=========================================
[main.py]
#
# Standard library imports
#
import logging
import pprint
import sys
sys.path.append("{0}/modules".format(sys.path[0]))
sys.path.append("{0}/modules/vendor".format(sys.path[0]))

#
# Related third party imports
#
import apache_beam as beam

#
# Local application/library specific imports
#
import utils
from pipeline_wrapper import pipelineWrapper
from tag_counts import TagCounts
from tags import Tags

<TRIMMED>

class FlattenTagFilesFn(beam.DoFn):
    def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
        beam.DoFn.__init__(self)

        self.s3Bucket = s3Bucket
        self.s3Creds  = s3Creds
        self.maxKeys  = maxKeys

    def process(self, elem):
        import yaml
        from frames import Frames

        if not hasattr(self, 's3Client'):
            import boto3
            self.s3Client = boto3.client('s3',
                                aws_access_key_id=self.s3Creds[0],
                                aws_secret_access_key=self.s3Creds[1])

        (key, info) = elem

        preFrm = {}
        resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
Key=info['pre'][0][0])
        yaml1 = yaml.load(resp1['Body'])

        for elem in yaml1['body']:
            preFrm[ elem['frame_tag']['frame_no'] ] = elem

        postFrm = {}
        resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
Key=info['post'][0][0])
        yaml2 = yaml.load(resp2['Body'])

        for elem in yaml2['body']:
            postFrm[ elem['frame_tag']['frame_no'] ] = elem

        commonFrmNums =
set(list(preFrm.keys())).intersection(list(postFrm.keys()))

        for f in commonFrmNums:
            frames = Frames(
                          self.s3Bucket,
                          info['pre'][0][0],            # Pre S3Key
                          info['post'][0][0],           # Post S3Key
                          yaml1['head']['operator_id'], # Pre OperatorId
                          yaml2['head']['operator_id'], # Post OperatorId
                          preFrm[f],                    # Pre Frame Line
                          postFrm[f],                   # Post Frame Line
                          info['pre'][0][1],            # Pre Last Modified
Time
                          info['post'][0][1])           # Post Last
Modified Time

            yield (frames)

        tagCounts = TagCounts(
                         self.s3Bucket,
                         yaml1,               # Pre Yaml
                         yaml2,               # Post Yaml
                         info['pre'][0][0],   # Pre S3Key
                         info['post'][0][0],  # Post S3Key
                         info['pre'][0][1],   # Pre Last Modified Time
                         info['post'][0][1] ) # Post Last Modified Time

        yield beam.pvalue.TaggedOutput('counts', tagCounts)
=========================================

My pipeline options are below. I tried with and without "setup_file" but
made no difference.
=========================================
        options = PipelineOptions([
                      "--runner=PortableRunner",

"--environment_config={0}".format(self.__docker_registry),
                      "--environment_type=DOCKER",
                      "--experiments=beam_fn_api",
                      "--job_endpoint=localhost:8099"
                  ])
        options.view_as(SetupOptions).save_main_session = True
        options.view_as(SetupOptions).setup_file =
'/home/admin/quality-validation/bin/setup.py'
=========================================

Is it possible to solve dependency in ParDo linked to external module when
using Apache Flink?

Thanks,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Re: How to import external module inside ParDo using Apache Flink ?

Posted by Yu Watanabe <yu...@gmail.com>.
I did not answer to Kyle's question. Sorry about that.

> Did you try moving the imports from the process function to the top of
main.py?

  Yes . Consequently, I moved all "imports" to at top of each files unless
I needed to exclusively  import inside the function.

On Mon, Oct 7, 2019 at 4:49 PM Yu Watanabe <yu...@gmail.com> wrote:

> Thank you for the comment.
>
> I finally got this working. I would like to share my experience for people
> whom are beginner with portable runner.
> What I done was below items when calling functions and classes from
> external package.
>
> 1. As Kyle said, I needed 'save_main_session' for sys path to persist
> after pickling.
>
> 2. I needed to push all related files to worker nodes using
> "extra_package" option to resolve dependency.
> https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
>
> 3. I needed to write import syntax in clear fashion otherwise I got below
> error in task manager.
> Looks like external packages is pushed in to
> "/usr/local/lib/python3.5/site-packages" and requires PKG.MODULENAME format
> to work it out.
>
> ================================================================================================
> import utils
>
> ...
>
>   File "/usr/local/lib/python3.5/site-packages/modules/beam_pardo.py",
> line 18, in <module>
>     import utils
> ImportError: No module named 'utils'
>
> ================================================================================================
>
> Below is my import syntax for external package in main.py. Other files
> also follow below syntax.
>
> ================================================================================================
> #
> # Local application/library specific imports
> #
> import pkg_aif.utils as ut
> from pkg_aif.beam_pardo import VerifyFn
> from pkg_aif.beam_pardo import FlattenTagFilesFn
> from pkg_aif.beam_states import StatefulBufferingFn
> from pkg_aif.pipeline_wrapper import pipelineWrapper
> from pkg_aif.frames import Frames
> from pkg_aif.tag_counts import TagCounts
> from pkg_aif.tags import Tags
> from pkg_aif.es_credentials import EsCredentials
> from pkg_aif.s3_credentials import S3Credentials
>
> ================================================================================================
>
> Below are related information to above.
>
> Full options for PipelineOptions.
>
> ================================================================================================
>         options = PipelineOptions([
>                       "--runner=PortableRunner",
>                       "--environment_config={0}".format(DOCKER_REGISTRY),
>                       "--environment_type=DOCKER",
>                       "--experiments=beam_fn_api",
>                       "--parallelism={0}".format(PARALLELISM),
>                       "--job_endpoint=localhost:8099",
>                       "--extra_package=PATH_TO_SDIST"
>                   ])
>         options.view_as(SetupOptions).save_main_session = True
>
>         return beam.Pipeline(options=options)
>
> ================================================================================================
>
> My setup.py is below.
>
> ================================================================================================
> import setuptools
>
> REQUIRED_PACKAGES = [
>             'apache-beam==2.15.0',
>             'elasticsearch>=7.0.0,<8.0.0',
>             'urllib3',
>             'boto3'
>                 ]
>
> setuptools.setup(
>    author       = 'Yu Watanabe',
>    author_email = 'AUTHOR_EMAIL',
>    url          = 'URL',
>    name         = 'quality_validation',
>    version      = '0.1',
>    install_requires = REQUIRED_PACKAGES,
>    packages     = setuptools.find_packages(),
>
> )
>
> ================================================================================================
>
> Directory path to setup.py.
>
> ================================================================================================
> admin@ip-172-31-9-89:~/quality-validation-distribute/bin$ ls -l
> total 20
> drwxr-xr-x 2 admin admin 4096 Oct  2 19:30 dist
> -rw-r--r-- 1 admin admin    0 Sep  5 21:21 __init__.py
> -rw-r--r-- 1 admin admin 3782 Oct  3 11:02 main.py
> drwxr-xr-x 3 admin admin 4096 Oct  3 15:41 pkg_aif
> drwxr-xr-x 2 admin admin 4096 Oct  2 19:30 quality_validation.egg-info
> -rw-r--r-- 1 admin admin  517 Oct  1 15:21 setup.py
>
> ================================================================================================
>
> Thanks,
> Yu Watanabe
>
> On Fri, Sep 27, 2019 at 3:23 AM Kyle Weaver <kc...@google.com> wrote:
>
>> Did you try moving the imports from the process function to the top of
>> main.py?
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>>
>>
>> On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe <yu...@gmail.com>
>> wrote:
>>
>>> Hello.
>>>
>>> I would like to ask for help with resolving dependency issue for
>>> imported module.
>>>
>>> I have a directory structure as below and I am trying to import Frames
>>> class from frames.py to main.py.
>>> =========================================
>>> quality-validation/
>>>     bin/setup.py
>>>           main.py
>>>           modules/
>>>             frames.py
>>>            <TRIMMED>
>>> =========================================
>>>
>>> However, when I run pipeline, I get below error at TaskManager.
>>> =========================================
>>> <TRIMMED>
>>>   File "apache_beam/runners/common.py", line 942, in
>>> apache_beam.runners.common._OutputProcessor.process_outputs
>>>   File "apache_beam/runners/worker/operations.py", line 143, in
>>> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>>>   File "apache_beam/runners/worker/operations.py", line 593, in
>>> apache_beam.runners.worker.operations.DoOperation.process
>>>   File "apache_beam/runners/worker/operations.py", line 594, in
>>> apache_beam.runners.worker.operations.DoOperation.process
>>>   File "apache_beam/runners/common.py", line 799, in
>>> apache_beam.runners.common.DoFnRunner.receive
>>>   File "apache_beam/runners/common.py", line 805, in
>>> apache_beam.runners.common.DoFnRunner.process
>>>   File "apache_beam/runners/common.py", line 872, in
>>> apache_beam.runners.common.DoFnRunner._reraise_augmented
>>>   File
>>> "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py", line
>>> 419, in raise_with_traceback
>>>     raise exc.with_traceback(traceback)
>>>   File "apache_beam/runners/common.py", line 803, in
>>> apache_beam.runners.common.DoFnRunner.process
>>>   File "apache_beam/runners/common.py", line 465, in
>>> apache_beam.runners.common.SimpleInvoker.invoke_process
>>>   File "apache_beam/runners/common.py", line 918, in
>>> apache_beam.runners.common._OutputProcessor.process_outputs
>>>   File "/home/admin/quality-validation/bin/main.py", line 44, in process
>>> ImportError: No module named 'frames' [while running
>>> 'combined:flat/ParDo(FlattenTagFilesFn)']
>>> =========================================
>>>
>>> I  import modules at global context  and also at top of the process
>>> function .
>>> =========================================
>>> [main.py]
>>> #
>>> # Standard library imports
>>> #
>>> import logging
>>> import pprint
>>> import sys
>>> sys.path.append("{0}/modules".format(sys.path[0]))
>>> sys.path.append("{0}/modules/vendor".format(sys.path[0]))
>>>
>>> #
>>> # Related third party imports
>>> #
>>> import apache_beam as beam
>>>
>>> #
>>> # Local application/library specific imports
>>> #
>>> import utils
>>> from pipeline_wrapper import pipelineWrapper
>>> from tag_counts import TagCounts
>>> from tags import Tags
>>>
>>> <TRIMMED>
>>>
>>> class FlattenTagFilesFn(beam.DoFn):
>>>     def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>>>         beam.DoFn.__init__(self)
>>>
>>>         self.s3Bucket = s3Bucket
>>>         self.s3Creds  = s3Creds
>>>         self.maxKeys  = maxKeys
>>>
>>>     def process(self, elem):
>>>         import yaml
>>>         from frames import Frames
>>>
>>>         if not hasattr(self, 's3Client'):
>>>             import boto3
>>>             self.s3Client = boto3.client('s3',
>>>                                 aws_access_key_id=self.s3Creds[0],
>>>                                 aws_secret_access_key=self.s3Creds[1])
>>>
>>>         (key, info) = elem
>>>
>>>         preFrm = {}
>>>         resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
>>> Key=info['pre'][0][0])
>>>         yaml1 = yaml.load(resp1['Body'])
>>>
>>>         for elem in yaml1['body']:
>>>             preFrm[ elem['frame_tag']['frame_no'] ] = elem
>>>
>>>         postFrm = {}
>>>         resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
>>> Key=info['post'][0][0])
>>>         yaml2 = yaml.load(resp2['Body'])
>>>
>>>         for elem in yaml2['body']:
>>>             postFrm[ elem['frame_tag']['frame_no'] ] = elem
>>>
>>>         commonFrmNums =
>>> set(list(preFrm.keys())).intersection(list(postFrm.keys()))
>>>
>>>         for f in commonFrmNums:
>>>             frames = Frames(
>>>                           self.s3Bucket,
>>>                           info['pre'][0][0],            # Pre S3Key
>>>                           info['post'][0][0],           # Post S3Key
>>>                           yaml1['head']['operator_id'], # Pre OperatorId
>>>                           yaml2['head']['operator_id'], # Post OperatorId
>>>                           preFrm[f],                    # Pre Frame Line
>>>                           postFrm[f],                   # Post Frame Line
>>>                           info['pre'][0][1],            # Pre Last
>>> Modified Time
>>>                           info['post'][0][1])           # Post Last
>>> Modified Time
>>>
>>>             yield (frames)
>>>
>>>         tagCounts = TagCounts(
>>>                          self.s3Bucket,
>>>                          yaml1,               # Pre Yaml
>>>                          yaml2,               # Post Yaml
>>>                          info['pre'][0][0],   # Pre S3Key
>>>                          info['post'][0][0],  # Post S3Key
>>>                          info['pre'][0][1],   # Pre Last Modified Time
>>>                          info['post'][0][1] ) # Post Last Modified Time
>>>
>>>         yield beam.pvalue.TaggedOutput('counts', tagCounts)
>>> =========================================
>>>
>>> My pipeline options are below. I tried with and without "setup_file" but
>>> made no difference.
>>> =========================================
>>>         options = PipelineOptions([
>>>                       "--runner=PortableRunner",
>>>
>>> "--environment_config={0}".format(self.__docker_registry),
>>>                       "--environment_type=DOCKER",
>>>                       "--experiments=beam_fn_api",
>>>                       "--job_endpoint=localhost:8099"
>>>                   ])
>>>         options.view_as(SetupOptions).save_main_session = True
>>>         options.view_as(SetupOptions).setup_file =
>>> '/home/admin/quality-validation/bin/setup.py'
>>> =========================================
>>>
>>> Is it possible to solve dependency in ParDo linked to external module
>>> when using Apache Flink?
>>>
>>> Thanks,
>>> Yu Watanabe
>>>
>>> --
>>> Yu Watanabe
>>> Weekend Freelancer who loves to challenge building data platform
>>> yu.w.tennis@gmail.com
>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>
>>
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.tennis@gmail.com
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
> Twitter icon] <https://twitter.com/yuwtennis>
>


-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Re: How to import external module inside ParDo using Apache Flink ?

Posted by Yu Watanabe <yu...@gmail.com>.
Thank you for the comment.

I finally got this working. I would like to share my experience for people
whom are beginner with portable runner.
What I done was below items when calling functions and classes from
external package.

1. As Kyle said, I needed 'save_main_session' for sys path to persist after
pickling.

2. I needed to push all related files to worker nodes using "extra_package"
option to resolve dependency.
https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/

3. I needed to write import syntax in clear fashion otherwise I got below
error in task manager.
Looks like external packages is pushed in to
"/usr/local/lib/python3.5/site-packages" and requires PKG.MODULENAME format
to work it out.
================================================================================================
import utils

...

  File "/usr/local/lib/python3.5/site-packages/modules/beam_pardo.py", line
18, in <module>
    import utils
ImportError: No module named 'utils'
================================================================================================

Below is my import syntax for external package in main.py. Other files also
follow below syntax.
================================================================================================
#
# Local application/library specific imports
#
import pkg_aif.utils as ut
from pkg_aif.beam_pardo import VerifyFn
from pkg_aif.beam_pardo import FlattenTagFilesFn
from pkg_aif.beam_states import StatefulBufferingFn
from pkg_aif.pipeline_wrapper import pipelineWrapper
from pkg_aif.frames import Frames
from pkg_aif.tag_counts import TagCounts
from pkg_aif.tags import Tags
from pkg_aif.es_credentials import EsCredentials
from pkg_aif.s3_credentials import S3Credentials
================================================================================================

Below are related information to above.

Full options for PipelineOptions.
================================================================================================
        options = PipelineOptions([
                      "--runner=PortableRunner",
                      "--environment_config={0}".format(DOCKER_REGISTRY),
                      "--environment_type=DOCKER",
                      "--experiments=beam_fn_api",
                      "--parallelism={0}".format(PARALLELISM),
                      "--job_endpoint=localhost:8099",
                      "--extra_package=PATH_TO_SDIST"
                  ])
        options.view_as(SetupOptions).save_main_session = True

        return beam.Pipeline(options=options)
================================================================================================

My setup.py is below.
================================================================================================
import setuptools

REQUIRED_PACKAGES = [
            'apache-beam==2.15.0',
            'elasticsearch>=7.0.0,<8.0.0',
            'urllib3',
            'boto3'
                ]

setuptools.setup(
   author       = 'Yu Watanabe',
   author_email = 'AUTHOR_EMAIL',
   url          = 'URL',
   name         = 'quality_validation',
   version      = '0.1',
   install_requires = REQUIRED_PACKAGES,
   packages     = setuptools.find_packages(),

)
================================================================================================

Directory path to setup.py.
================================================================================================
admin@ip-172-31-9-89:~/quality-validation-distribute/bin$ ls -l
total 20
drwxr-xr-x 2 admin admin 4096 Oct  2 19:30 dist
-rw-r--r-- 1 admin admin    0 Sep  5 21:21 __init__.py
-rw-r--r-- 1 admin admin 3782 Oct  3 11:02 main.py
drwxr-xr-x 3 admin admin 4096 Oct  3 15:41 pkg_aif
drwxr-xr-x 2 admin admin 4096 Oct  2 19:30 quality_validation.egg-info
-rw-r--r-- 1 admin admin  517 Oct  1 15:21 setup.py
================================================================================================

Thanks,
Yu Watanabe

On Fri, Sep 27, 2019 at 3:23 AM Kyle Weaver <kc...@google.com> wrote:

> Did you try moving the imports from the process function to the top of
> main.py?
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>
>
> On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe <yu...@gmail.com>
> wrote:
>
>> Hello.
>>
>> I would like to ask for help with resolving dependency issue for imported
>> module.
>>
>> I have a directory structure as below and I am trying to import Frames
>> class from frames.py to main.py.
>> =========================================
>> quality-validation/
>>     bin/setup.py
>>           main.py
>>           modules/
>>             frames.py
>>            <TRIMMED>
>> =========================================
>>
>> However, when I run pipeline, I get below error at TaskManager.
>> =========================================
>> <TRIMMED>
>>   File "apache_beam/runners/common.py", line 942, in
>> apache_beam.runners.common._OutputProcessor.process_outputs
>>   File "apache_beam/runners/worker/operations.py", line 143, in
>> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>>   File "apache_beam/runners/worker/operations.py", line 593, in
>> apache_beam.runners.worker.operations.DoOperation.process
>>   File "apache_beam/runners/worker/operations.py", line 594, in
>> apache_beam.runners.worker.operations.DoOperation.process
>>   File "apache_beam/runners/common.py", line 799, in
>> apache_beam.runners.common.DoFnRunner.receive
>>   File "apache_beam/runners/common.py", line 805, in
>> apache_beam.runners.common.DoFnRunner.process
>>   File "apache_beam/runners/common.py", line 872, in
>> apache_beam.runners.common.DoFnRunner._reraise_augmented
>>   File "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py",
>> line 419, in raise_with_traceback
>>     raise exc.with_traceback(traceback)
>>   File "apache_beam/runners/common.py", line 803, in
>> apache_beam.runners.common.DoFnRunner.process
>>   File "apache_beam/runners/common.py", line 465, in
>> apache_beam.runners.common.SimpleInvoker.invoke_process
>>   File "apache_beam/runners/common.py", line 918, in
>> apache_beam.runners.common._OutputProcessor.process_outputs
>>   File "/home/admin/quality-validation/bin/main.py", line 44, in process
>> ImportError: No module named 'frames' [while running
>> 'combined:flat/ParDo(FlattenTagFilesFn)']
>> =========================================
>>
>> I  import modules at global context  and also at top of the process
>> function .
>> =========================================
>> [main.py]
>> #
>> # Standard library imports
>> #
>> import logging
>> import pprint
>> import sys
>> sys.path.append("{0}/modules".format(sys.path[0]))
>> sys.path.append("{0}/modules/vendor".format(sys.path[0]))
>>
>> #
>> # Related third party imports
>> #
>> import apache_beam as beam
>>
>> #
>> # Local application/library specific imports
>> #
>> import utils
>> from pipeline_wrapper import pipelineWrapper
>> from tag_counts import TagCounts
>> from tags import Tags
>>
>> <TRIMMED>
>>
>> class FlattenTagFilesFn(beam.DoFn):
>>     def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>>         beam.DoFn.__init__(self)
>>
>>         self.s3Bucket = s3Bucket
>>         self.s3Creds  = s3Creds
>>         self.maxKeys  = maxKeys
>>
>>     def process(self, elem):
>>         import yaml
>>         from frames import Frames
>>
>>         if not hasattr(self, 's3Client'):
>>             import boto3
>>             self.s3Client = boto3.client('s3',
>>                                 aws_access_key_id=self.s3Creds[0],
>>                                 aws_secret_access_key=self.s3Creds[1])
>>
>>         (key, info) = elem
>>
>>         preFrm = {}
>>         resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
>> Key=info['pre'][0][0])
>>         yaml1 = yaml.load(resp1['Body'])
>>
>>         for elem in yaml1['body']:
>>             preFrm[ elem['frame_tag']['frame_no'] ] = elem
>>
>>         postFrm = {}
>>         resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
>> Key=info['post'][0][0])
>>         yaml2 = yaml.load(resp2['Body'])
>>
>>         for elem in yaml2['body']:
>>             postFrm[ elem['frame_tag']['frame_no'] ] = elem
>>
>>         commonFrmNums =
>> set(list(preFrm.keys())).intersection(list(postFrm.keys()))
>>
>>         for f in commonFrmNums:
>>             frames = Frames(
>>                           self.s3Bucket,
>>                           info['pre'][0][0],            # Pre S3Key
>>                           info['post'][0][0],           # Post S3Key
>>                           yaml1['head']['operator_id'], # Pre OperatorId
>>                           yaml2['head']['operator_id'], # Post OperatorId
>>                           preFrm[f],                    # Pre Frame Line
>>                           postFrm[f],                   # Post Frame Line
>>                           info['pre'][0][1],            # Pre Last
>> Modified Time
>>                           info['post'][0][1])           # Post Last
>> Modified Time
>>
>>             yield (frames)
>>
>>         tagCounts = TagCounts(
>>                          self.s3Bucket,
>>                          yaml1,               # Pre Yaml
>>                          yaml2,               # Post Yaml
>>                          info['pre'][0][0],   # Pre S3Key
>>                          info['post'][0][0],  # Post S3Key
>>                          info['pre'][0][1],   # Pre Last Modified Time
>>                          info['post'][0][1] ) # Post Last Modified Time
>>
>>         yield beam.pvalue.TaggedOutput('counts', tagCounts)
>> =========================================
>>
>> My pipeline options are below. I tried with and without "setup_file" but
>> made no difference.
>> =========================================
>>         options = PipelineOptions([
>>                       "--runner=PortableRunner",
>>
>> "--environment_config={0}".format(self.__docker_registry),
>>                       "--environment_type=DOCKER",
>>                       "--experiments=beam_fn_api",
>>                       "--job_endpoint=localhost:8099"
>>                   ])
>>         options.view_as(SetupOptions).save_main_session = True
>>         options.view_as(SetupOptions).setup_file =
>> '/home/admin/quality-validation/bin/setup.py'
>> =========================================
>>
>> Is it possible to solve dependency in ParDo linked to external module
>> when using Apache Flink?
>>
>> Thanks,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>> Weekend Freelancer who loves to challenge building data platform
>> yu.w.tennis@gmail.com
>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>> Twitter icon] <https://twitter.com/yuwtennis>
>>
>

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.tennis@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Re: How to import external module inside ParDo using Apache Flink ?

Posted by Kyle Weaver <kc...@google.com>.
Did you try moving the imports from the process function to the top of
main.py?

Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com


On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe <yu...@gmail.com> wrote:

> Hello.
>
> I would like to ask for help with resolving dependency issue for imported
> module.
>
> I have a directory structure as below and I am trying to import Frames
> class from frames.py to main.py.
> =========================================
> quality-validation/
>     bin/setup.py
>           main.py
>           modules/
>             frames.py
>            <TRIMMED>
> =========================================
>
> However, when I run pipeline, I get below error at TaskManager.
> =========================================
> <TRIMMED>
>   File "apache_beam/runners/common.py", line 942, in
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 143, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 593, in
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 594, in
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 799, in
> apache_beam.runners.common.DoFnRunner.receive
>   File "apache_beam/runners/common.py", line 805, in
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 872, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py",
> line 419, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 803, in
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 465, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 918, in
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/home/admin/quality-validation/bin/main.py", line 44, in process
> ImportError: No module named 'frames' [while running
> 'combined:flat/ParDo(FlattenTagFilesFn)']
> =========================================
>
> I  import modules at global context  and also at top of the process
> function .
> =========================================
> [main.py]
> #
> # Standard library imports
> #
> import logging
> import pprint
> import sys
> sys.path.append("{0}/modules".format(sys.path[0]))
> sys.path.append("{0}/modules/vendor".format(sys.path[0]))
>
> #
> # Related third party imports
> #
> import apache_beam as beam
>
> #
> # Local application/library specific imports
> #
> import utils
> from pipeline_wrapper import pipelineWrapper
> from tag_counts import TagCounts
> from tags import Tags
>
> <TRIMMED>
>
> class FlattenTagFilesFn(beam.DoFn):
>     def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>         beam.DoFn.__init__(self)
>
>         self.s3Bucket = s3Bucket
>         self.s3Creds  = s3Creds
>         self.maxKeys  = maxKeys
>
>     def process(self, elem):
>         import yaml
>         from frames import Frames
>
>         if not hasattr(self, 's3Client'):
>             import boto3
>             self.s3Client = boto3.client('s3',
>                                 aws_access_key_id=self.s3Creds[0],
>                                 aws_secret_access_key=self.s3Creds[1])
>
>         (key, info) = elem
>
>         preFrm = {}
>         resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
> Key=info['pre'][0][0])
>         yaml1 = yaml.load(resp1['Body'])
>
>         for elem in yaml1['body']:
>             preFrm[ elem['frame_tag']['frame_no'] ] = elem
>
>         postFrm = {}
>         resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
> Key=info['post'][0][0])
>         yaml2 = yaml.load(resp2['Body'])
>
>         for elem in yaml2['body']:
>             postFrm[ elem['frame_tag']['frame_no'] ] = elem
>
>         commonFrmNums =
> set(list(preFrm.keys())).intersection(list(postFrm.keys()))
>
>         for f in commonFrmNums:
>             frames = Frames(
>                           self.s3Bucket,
>                           info['pre'][0][0],            # Pre S3Key
>                           info['post'][0][0],           # Post S3Key
>                           yaml1['head']['operator_id'], # Pre OperatorId
>                           yaml2['head']['operator_id'], # Post OperatorId
>                           preFrm[f],                    # Pre Frame Line
>                           postFrm[f],                   # Post Frame Line
>                           info['pre'][0][1],            # Pre Last
> Modified Time
>                           info['post'][0][1])           # Post Last
> Modified Time
>
>             yield (frames)
>
>         tagCounts = TagCounts(
>                          self.s3Bucket,
>                          yaml1,               # Pre Yaml
>                          yaml2,               # Post Yaml
>                          info['pre'][0][0],   # Pre S3Key
>                          info['post'][0][0],  # Post S3Key
>                          info['pre'][0][1],   # Pre Last Modified Time
>                          info['post'][0][1] ) # Post Last Modified Time
>
>         yield beam.pvalue.TaggedOutput('counts', tagCounts)
> =========================================
>
> My pipeline options are below. I tried with and without "setup_file" but
> made no difference.
> =========================================
>         options = PipelineOptions([
>                       "--runner=PortableRunner",
>
> "--environment_config={0}".format(self.__docker_registry),
>                       "--environment_type=DOCKER",
>                       "--experiments=beam_fn_api",
>                       "--job_endpoint=localhost:8099"
>                   ])
>         options.view_as(SetupOptions).save_main_session = True
>         options.view_as(SetupOptions).setup_file =
> '/home/admin/quality-validation/bin/setup.py'
> =========================================
>
> Is it possible to solve dependency in ParDo linked to external module when
> using Apache Flink?
>
> Thanks,
> Yu Watanabe
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.tennis@gmail.com
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
> Twitter icon] <https://twitter.com/yuwtennis>
>