You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Eugene Kirpichov <ek...@gmail.com> on 2020/08/08 01:31:51 UTC

Staged PIP package mysteriously ungzipped, non-installable inside the worker

Hi old Beam friends,

I left Google to work on climate change
<https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U>
and am now doing a short engagement with Pachama <https://pachama.com/>.
Right now I'm trying to get a Beam Python pipeline to work; the pipeline
will use fancy requirements and native dependencies, and we plan to run it
on Cloud Dataflow (so custom containers are not yet an option), so I'm
going straight for the direct PortableRunner as per
https://beam.apache.org/documentation/runtime/environments/.

Basically I can't get a minimal Beam program with a minimal
requirements.txt file to work - the .tar.gz of the dependency mysteriously
ends up being ungzipped and non-installable inside the Docker container
running the worker. Details below.

=== main.py ===
import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as p:
        (p | 'Create' >> beam.Create(['Hello'])
           | 'Write' >> beam.io.WriteToText('/tmp'))


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

=== requirements.txt ===
alembic

When I run the program:
$ python3 main.py
--runner=PortableRunner --job_endpoint=embed
--requirements_file=requirements.txt


I get some normal output and then:

INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'  File
"/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
line 261, in unpack_file\n    untar_file(filename, location)\n  File
"/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
line 177, in untar_file\n    tar = tarfile.open(filename, mode)\n  File
"/usr/local/lib/python3.7/tarfile.py", line 1591, in open\n    return
func(name, filemode, fileobj, **kwargs)\n  File
"/usr/local/lib/python3.7/tarfile.py", line 1648, in gzopen\n    raise
ReadError("not a gzip file")\ntarfile.ReadError: not a gzip
file\n2020/08/08 01:17:07 Failed to install required packages: failed to
install requirements: exit status 2\n'

This greatly puzzled me and, after some looking, I found something really
surprising. Here is the package in the *directory to be staged*:

$ file
/var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
...: gzip compressed data, was "dist/alembic-1.4.2.tar", last modified: Thu
Mar 19 21:48:31 2020, max compression, original size modulo 2^32 4730880
$ ls -l
/var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
-rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...

So far so good. But here is the same file inside the Docker container (I ssh'd
into the dead container
<https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers>
):

# file /tmp/staged/alembic-1.4.2.tar.gz
/tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive (GNU)
# ls -l /tmp/staged/alembic-1.4.2.tar.gz
-rwxr-xr-x 1 root root 4730880 Aug  8 01:17 /tmp/staged/alembic-1.4.2.tar.gz

The file has clearly been unzipped and now of course pip can't install it!
What's going on here? Am I using the direct/portable runner combination
wrong?

Thanks!

-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Eugene Kirpichov <ek...@gmail.com>.
Thanks all! Sent https://github.com/apache/beam/pull/12619 to
cherrypick into 2.24.

On Mon, Aug 17, 2020 at 3:37 PM Robert Bradshaw <ro...@google.com> wrote:

> I checked Java, it looks like the way things are structured we do not
> have that bug there.
>
> On Mon, Aug 17, 2020 at 3:31 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >
> > +1
> >
> > Thanks, Eugene, for finding and fixing this!
> >
> > FWIW, most use of Python from the Python Portable Runner used the
> > embedded environment (this is the default direct runner), so
> > dependencies are already present.
> >
> > On Mon, Aug 17, 2020 at 3:19 PM Daniel Oliveira <da...@google.com>
> wrote:
> > >
> > > Normally I'd say not to cherry-pick this since the issue is only
> affecting one runner and isn't really a regression, but given that it's the
> last Py2 release and there won't be a follow-up release that will be able
> to include this fix, I think it's worth making an exception this time.
> There should be at least one release with a working portable runner for Py2
> users, given that the portable runner is included in examples on the
> website. Plus, I'm already waiting for some other cherry-picks, so it won't
> even delay anything.
> > >
> > > So yes, Eugene if you could create a cherry-pick of this change into
> the release-2.24.0 branch, I'll review and merge it.
> > >
> > > On Mon, Aug 17, 2020 at 11:27 AM Valentyn Tymofieiev <
> valentyn@google.com> wrote:
> > >>
> > >> Will defer to the release manager; one reason to cherry-pick is that
> 2.24.0 will be the last release with Python 2 support, so Py2 users of
> Portable Python Local Runner might appreciate the fix, since they won't be
> able to use the next release.
> > >>
> > >> On Thu, Aug 13, 2020 at 6:28 PM Eugene Kirpichov <
> ekirpichov@gmail.com> wrote:
> > >>>
> > >>> +Daniel as in charge of 2.24 per dev@ thread.
> > >>>
> > >>> On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov <
> ekirpichov@gmail.com> wrote:
> > >>>>
> > >>>> The PR is merged.
> > >>>>
> > >>>> Do folks think this warrants being cherrypicked into v2.24? My
> hunch is yes, cause basically one of the runners (local portable python
> runner) is broken for any production workload (works only if your pipeline
> has no dependencies).
> > >>>>
> > >>>> On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <
> ekirpichov@gmail.com> wrote:
> > >>>>>
> > >>>>> FWIW I sent a PR to fix this
> https://github.com/apache/beam/pull/12571
> > >>>>>
> > >>>>> However, I'm not up to date on the portable test infrastructure
> and would appreciate guidance on what tests I can add for this.
> > >>>>>
> > >>>>> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <
> ekirpichov@gmail.com> wrote:
> > >>>>>>
> > >>>>>> (FYI Sam +sbrother@gmail.com)
> > >>>>>>
> > >>>>>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <
> ekirpichov@gmail.com> wrote:
> > >>>>>>>
> > >>>>>>> Ok I found the bug, and now I don't understand how it could have
> possibly ever worked. And if this was never tested, then I don't understand
> why it works after fixing this one bug :)
> > >>>>>>>
> > >>>>>>> Basically the Python ArtifactStaging/RetrievalService uses
> FileSystems.open() to read the artifacts to be staged, and
> FileSystems.open() by default decompresses compressed files based on their
> extension.
> > >>>>>>> I found two of such services - in Python and in Java. Is the
> Python used with an embedded job endpoint and the java one otherwise? I
> haven't inspected the Java one, but fixing Python does the trick.
> > >>>>>>>
> > >>>>>>> The fix is this patch:
> > >>>>>>>
> > >>>>>>> diff --git
> a/sdks/python/apache_beam/runners/portability/artifact_service.py
> b/sdks/python/apache_beam/runners/portability/artifact_service.py
> > >>>>>>> index f2bbf534c3..1f3ec1c0b0 100644
> > >>>>>>> ---
> a/sdks/python/apache_beam/runners/portability/artifact_service.py
> > >>>>>>> +++
> b/sdks/python/apache_beam/runners/portability/artifact_service.py
> > >>>>>>> @@ -41,6 +41,7 @@ import grpc
> > >>>>>>>  from future.moves.urllib.request import urlopen
> > >>>>>>>
> > >>>>>>>  from apache_beam.io import filesystems
> > >>>>>>> +from apache_beam.io.filesystems import CompressionTypes
> > >>>>>>>  from apache_beam.portability import common_urns
> > >>>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2
> > >>>>>>>  from apache_beam.portability.api import
> beam_artifact_api_pb2_grpc
> > >>>>>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
> > >>>>>>>      self._root = root
> > >>>>>>>
> > >>>>>>>    def file_reader(self, path):
> > >>>>>>> -    return filesystems.FileSystems.open(path)
> > >>>>>>> +    return filesystems.FileSystems.open(
> > >>>>>>> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
> > >>>>>>>
> > >>>>>>>    def file_writer(self, name=None):
> > >>>>>>>      full_path = filesystems.FileSystems.join(self._root, name)
> > >>>>>>> diff --git
> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> > >>>>>>> index 5bf3282250..2684235be0 100644
> > >>>>>>> ---
> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> > >>>>>>> +++
> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> > >>>>>>> @@ -45,6 +45,7 @@ from typing import overload
> > >>>>>>>  import grpc
> > >>>>>>>
> > >>>>>>>  from apache_beam.io import filesystems
> > >>>>>>> +from apache_beam.io.filesystems import CompressionTypes
> > >>>>>>>  from apache_beam.portability import common_urns
> > >>>>>>>  from apache_beam.portability import python_urns
> > >>>>>>>  from apache_beam.portability.api import
> beam_artifact_api_pb2_grpc
> > >>>>>>> @@ -464,9 +465,13 @@ class GrpcServer(object):
> > >>>>>>>                  self.provision_info.provision_info,
> worker_manager),
> > >>>>>>>              self.control_server)
> > >>>>>>>
> > >>>>>>> +      def open_uncompressed(f):
> > >>>>>>> +        return filesystems.FileSystems.open(
> > >>>>>>> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
> > >>>>>>> +
> > >>>>>>>
> beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
> > >>>>>>>            artifact_service.ArtifactRetrievalService(
> > >>>>>>> -              file_reader=filesystems.FileSystems.open),
> > >>>>>>> +              file_reader=open_uncompressed),
> > >>>>>>>            self.control_server)
> > >>>>>>>
> > >>>>>>>      self.data_plane_handler = data_plane.BeamFnDataServicer(
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <
> ekirpichov@gmail.com> wrote:
> > >>>>>>>>
> > >>>>>>>> Hi Maximilian,
> > >>>>>>>>
> > >>>>>>>> Thank you - it works fine with the embedded Flink runner (per
> below, seems like it's not using Docker for running Python code? What is it
> using then?).
> > >>>>>>>>
> > >>>>>>>> However, the original bug appears to be wider than I thought -
> it is also present if I run --runner=FlinkRunner --environment_type=DOCKER.
> Seems like something is very broken in local Docker execution in general -
> I haven't yet verified whether the same error will happen when running on a
> remote Flink cluster.
> > >>>>>>>>
> > >>>>>>>> Trying to build my own SDK containers with some more debugging
> so I can figure out what's going on...
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <
> mxm@apache.org> wrote:
> > >>>>>>>>>
> > >>>>>>>>> Looks like you ran into a bug.
> > >>>>>>>>>
> > >>>>>>>>> You could just run your program without specifying any
> arguments, since
> > >>>>>>>>> running with Python's FnApiRunner should be enough.
> > >>>>>>>>>
> > >>>>>>>>> Alternatively, how about trying to run the same pipeline with
> the
> > >>>>>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an
> endpoint.
> > >>>>>>>>> It will run the Python code embedded (loopback environment)
> without
> > >>>>>>>>> additional containers.
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>> Max
> > >>>>>>>>>
> > >>>>>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
> > >>>>>>>>> > Thanks Valentyn!
> > >>>>>>>>> >
> > >>>>>>>>> > Good to know that this is a bug (I'll file a bug), and that
> Dataflow has
> > >>>>>>>>> > an experimental way to use custom containers. I'll try that.
> > >>>>>>>>> >
> > >>>>>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
> > >>>>>>>>> > <valentyn@google.com <ma...@google.com>> wrote:
> > >>>>>>>>> >
> > >>>>>>>>> >     Hi Eugene,
> > >>>>>>>>> >
> > >>>>>>>>> >     Good to hear from you. The experience you are describing
> on Portable
> > >>>>>>>>> >     Runner + Docker container in local execution mode is
> most certainly
> > >>>>>>>>> >     a bug, if you have not opened an issue on it, please do
> so and feel
> > >>>>>>>>> >     free to cc me.
> > >>>>>>>>> >
> > >>>>>>>>> >     I can also reproduce the bug and likewise didn't see
> anything
> > >>>>>>>>> >     obvious immediately, this needs some debugging.
> > >>>>>>>>> >
> > >>>>>>>>> >     cc: +Ankur Goenka <ma...@google.com> +Kyle
> Weaver
> > >>>>>>>>> >     <ma...@google.com> who recently worked on
> Portable Runner
> > >>>>>>>>> >     and may be interested.
> > >>>>>>>>> >
> > >>>>>>>>> >     By the way, you should be able to use custom containers
> with
> > >>>>>>>>> >     Dataflow, if you set --experiments=use_runner_v2.
> > >>>>>>>>> >
> > >>>>>>>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
> > >>>>>>>>> >     <ekirpichov@gmail.com <ma...@gmail.com>>
> wrote:
> > >>>>>>>>> >
> > >>>>>>>>> >         (cc'ing Sam with whom I'm working on this atm)
> > >>>>>>>>> >
> > >>>>>>>>> >         FWIW I'm still stumped. I've looked through Python,
> Go and Java
> > >>>>>>>>> >         code in the Beam repo having anything to do with
> > >>>>>>>>> >         gzipping/unzipping, and none of it appears to be
> used in the
> > >>>>>>>>> >         artifact staging/retrieval codepaths. I also can't
> find any
> > >>>>>>>>> >         mention of compression/decompression in the
> container boot code.
> > >>>>>>>>> >         My next step will be to add a bunch of debugging,
> rebuild the
> > >>>>>>>>> >         containers, and see what the artifact services think
> they're
> > >>>>>>>>> >         serving.
> > >>>>>>>>> >
> > >>>>>>>>> >
> > >>>>>>>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
> > >>>>>>>>> >         <ekirpichov@gmail.com <ma...@gmail.com>>
> wrote:
> > >>>>>>>>> >
> > >>>>>>>>> >             Thanks Austin! Good stuff - though note that I am
> > >>>>>>>>> >             /not/ using custom containers, I'm just trying
> to get the
> > >>>>>>>>> >             basic stuff to work, a Python pipeline with a
> simple
> > >>>>>>>>> >             requirements.txt file. Feels like this should
> work
> > >>>>>>>>> >             out-of-the-box, I must be doing something wrong.
> > >>>>>>>>> >
> > >>>>>>>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
> > >>>>>>>>> >             <whatwouldaustindo@gmail.com
> > >>>>>>>>> >             <ma...@gmail.com>> wrote:
> > >>>>>>>>> >
> > >>>>>>>>> >                 I only believe @OrielResearch Eila
> Arich-Landkof
> > >>>>>>>>> >                 <ma...@orielresearch.org> potentially
> doing
> > >>>>>>>>> >                 applied work with custom containers (there
> must be others)!
> > >>>>>>>>> >
> > >>>>>>>>> >                 For a plug for her and @BeamSummit --  I
> think enough
> > >>>>>>>>> >                 related will be talked about in (with Conda
> specifics)
> > >>>>>>>>> >                 -->
> > >>>>>>>>> >
> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
> > >>>>>>>>> >
> > >>>>>>>>> >                 I'm sure others will have more things to say
> that are
> > >>>>>>>>> >                 actually helpful, on-list, before that
> occurs (~3 weeks).
> > >>>>>>>>> >
> > >>>>>>>>> >
> > >>>>>>>>> >
> > >>>>>>>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene
> Kirpichov
> > >>>>>>>>> >                 <ekirpichov@gmail.com <mailto:
> ekirpichov@gmail.com>> wrote:
> > >>>>>>>>> >
> > >>>>>>>>> >                     Hi old Beam friends,
> > >>>>>>>>> >
> > >>>>>>>>> >                     I left Google to work on climate change
> > >>>>>>>>> >                     <
> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
> >
> > >>>>>>>>> >                     and am now doing a short engagement with
> Pachama
> > >>>>>>>>> >                     <https://pachama.com/>. Right now I'm
> trying to get
> > >>>>>>>>> >                     a Beam Python pipeline to work; the
> pipeline will
> > >>>>>>>>> >                     use fancy requirements and native
> dependencies, and
> > >>>>>>>>> >                     we plan to run it on Cloud Dataflow (so
> custom
> > >>>>>>>>> >                     containers are not yet an option), so
> I'm going
> > >>>>>>>>> >                     straight for the direct PortableRunner
> as per
> > >>>>>>>>> >
> https://beam.apache.org/documentation/runtime/environments/.
> > >>>>>>>>> >
> > >>>>>>>>> >                     Basically I can't get a minimal Beam
> program with a
> > >>>>>>>>> >                     minimal requirements.txt file to work -
> the .tar.gz
> > >>>>>>>>> >                     of the dependency mysteriously ends up
> being
> > >>>>>>>>> >                     ungzipped and non-installable inside the
> Docker
> > >>>>>>>>> >                     container running the worker. Details
> below.
> > >>>>>>>>> >
> > >>>>>>>>> >                     === main.py ===
> > >>>>>>>>> >                     import argparse
> > >>>>>>>>> >                     import logging
> > >>>>>>>>> >
> > >>>>>>>>> >                     import apache_beam as beam
> > >>>>>>>>> >                     from
> apache_beam.options.pipeline_options import
> > >>>>>>>>> >                     PipelineOptions
> > >>>>>>>>> >                     from
> apache_beam.options.pipeline_options import
> > >>>>>>>>> >                     SetupOptions
> > >>>>>>>>> >
> > >>>>>>>>> >                     def run(argv=None):
> > >>>>>>>>> >                          parser = argparse.ArgumentParser()
> > >>>>>>>>> >                          known_args, pipeline_args =
> > >>>>>>>>> >                     parser.parse_known_args(argv)
> > >>>>>>>>> >
> > >>>>>>>>> >                          pipeline_options =
> PipelineOptions(pipeline_args)
> > >>>>>>>>> >
> > >>>>>>>>> >
>  pipeline_options.view_as(SetupOptions).save_main_session
> > >>>>>>>>> >                     = True
> > >>>>>>>>> >
> > >>>>>>>>> >                          with
> beam.Pipeline(options=pipeline_options) as p:
> > >>>>>>>>> >                              (p | 'Create' >>
> beam.Create(['Hello'])
> > >>>>>>>>> >                                 | 'Write' >>
> beam.io.WriteToText('/tmp'))
> > >>>>>>>>> >
> > >>>>>>>>> >
> > >>>>>>>>> >                     if __name__ == '__main__':
> > >>>>>>>>> >
> logging.getLogger().setLevel(logging.INFO)
> > >>>>>>>>> >                          run()
> > >>>>>>>>> >
> > >>>>>>>>> >                     === requirements.txt ===
> > >>>>>>>>> >                     alembic
> > >>>>>>>>> >
> > >>>>>>>>> >                     When I run the program:
> > >>>>>>>>> >                     $ python3 main.py
> > >>>>>>>>> >                     --runner=PortableRunner
> --job_endpoint=embed --requirements_file=requirements.txt
> > >>>>>>>>> >
> > >>>>>>>>> >
> > >>>>>>>>> >                     I get some normal output and then:
> > >>>>>>>>> >
> > >>>>>>>>> >
>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
> > >>>>>>>>> >                       File
> > >>>>>>>>> >
>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
> > >>>>>>>>> >                     line 261, in unpack_file\n
> untar_file(filename,
> > >>>>>>>>> >                     location)\n  File
> > >>>>>>>>> >
>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
> > >>>>>>>>> >                     line 177, in untar_file\n    tar =
> > >>>>>>>>> >                     tarfile.open(filename, mode)\n  File
> > >>>>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py",
> line 1591, in
> > >>>>>>>>> >                     open\n    return func(name, filemode,
> fileobj,
> > >>>>>>>>> >                     **kwargs)\n  File
> > >>>>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py",
> line 1648, in
> > >>>>>>>>> >                     gzopen\n    raise ReadError("not a gzip
> > >>>>>>>>> >                     file")\ntarfile.ReadError: not a gzip
> > >>>>>>>>> >                     file\n2020/08/08 01:17:07 Failed to
> install required
> > >>>>>>>>> >                     packages: failed to install
> requirements: exit
> > >>>>>>>>> >                     status 2\n'
> > >>>>>>>>> >
> > >>>>>>>>> >                     This greatly puzzled me and, after some
> looking, I
> > >>>>>>>>> >                     found something really surprising. Here
> is the
> > >>>>>>>>> >                     package in the /directory to be staged/:
> > >>>>>>>>> >
> > >>>>>>>>> >                     $ file
> > >>>>>>>>> >
>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
> > >>>>>>>>> >                     ...: gzip compressed data, was
> > >>>>>>>>> >                     "dist/alembic-1.4.2.tar", last modified:
> Thu Mar 19
> > >>>>>>>>> >                     21:48:31 2020, max compression, original
> size modulo
> > >>>>>>>>> >                     2^32 4730880
> > >>>>>>>>> >                     $ ls -l
> > >>>>>>>>> >
>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
> > >>>>>>>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug
> 7 16:56 ...
> > >>>>>>>>> >
> > >>>>>>>>> >                     So far so good. But here is the same
> file inside the
> > >>>>>>>>> >                     Docker container (I ssh'd into the dead
> container
> > >>>>>>>>> >                     <
> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
> >):
> > >>>>>>>>> >
> > >>>>>>>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
> > >>>>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX
> tar archive
> > >>>>>>>>> >                     (GNU)
> > >>>>>>>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
> > >>>>>>>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8
> 01:17
> > >>>>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
> > >>>>>>>>> >
> > >>>>>>>>> >                     The file has clearly been unzipped and
> now of course
> > >>>>>>>>> >                     pip can't install it! What's going on
> here? Am I
> > >>>>>>>>> >                     using the direct/portable runner
> combination wrong?
> > >>>>>>>>> >
> > >>>>>>>>> >                     Thanks!
> > >>>>>>>>> >
> > >>>>>>>>> >                     --
> > >>>>>>>>> >                     Eugene Kirpichov
> > >>>>>>>>> >
> http://www.linkedin.com/in/eugenekirpichov
> > >>>>>>>>> >
> > >>>>>>>>> >
> > >>>>>>>>> >
> > >>>>>>>>> >             --
> > >>>>>>>>> >             Eugene Kirpichov
> > >>>>>>>>> >             http://www.linkedin.com/in/eugenekirpichov
> > >>>>>>>>> >
> > >>>>>>>>> >
> > >>>>>>>>> >
> > >>>>>>>>> >         --
> > >>>>>>>>> >         Eugene Kirpichov
> > >>>>>>>>> >         http://www.linkedin.com/in/eugenekirpichov
> > >>>>>>>>> >
> > >>>>>>>>> >
> > >>>>>>>>> >
> > >>>>>>>>> > --
> > >>>>>>>>> > Eugene Kirpichov
> > >>>>>>>>> > http://www.linkedin.com/in/eugenekirpichov
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>>> Eugene Kirpichov
> > >>>>>>>> http://www.linkedin.com/in/eugenekirpichov
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> --
> > >>>>>>> Eugene Kirpichov
> > >>>>>>> http://www.linkedin.com/in/eugenekirpichov
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> Eugene Kirpichov
> > >>>>>> http://www.linkedin.com/in/eugenekirpichov
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> --
> > >>>>> Eugene Kirpichov
> > >>>>> http://www.linkedin.com/in/eugenekirpichov
> > >>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> Eugene Kirpichov
> > >>>> http://www.linkedin.com/in/eugenekirpichov
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> Eugene Kirpichov
> > >>> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Robert Bradshaw <ro...@google.com>.
I checked Java, it looks like the way things are structured we do not
have that bug there.

On Mon, Aug 17, 2020 at 3:31 PM Robert Bradshaw <ro...@google.com> wrote:
>
> +1
>
> Thanks, Eugene, for finding and fixing this!
>
> FWIW, most use of Python from the Python Portable Runner used the
> embedded environment (this is the default direct runner), so
> dependencies are already present.
>
> On Mon, Aug 17, 2020 at 3:19 PM Daniel Oliveira <da...@google.com> wrote:
> >
> > Normally I'd say not to cherry-pick this since the issue is only affecting one runner and isn't really a regression, but given that it's the last Py2 release and there won't be a follow-up release that will be able to include this fix, I think it's worth making an exception this time. There should be at least one release with a working portable runner for Py2 users, given that the portable runner is included in examples on the website. Plus, I'm already waiting for some other cherry-picks, so it won't even delay anything.
> >
> > So yes, Eugene if you could create a cherry-pick of this change into the release-2.24.0 branch, I'll review and merge it.
> >
> > On Mon, Aug 17, 2020 at 11:27 AM Valentyn Tymofieiev <va...@google.com> wrote:
> >>
> >> Will defer to the release manager; one reason to cherry-pick is that 2.24.0 will be the last release with Python 2 support, so Py2 users of Portable Python Local Runner might appreciate the fix, since they won't be able to use the next release.
> >>
> >> On Thu, Aug 13, 2020 at 6:28 PM Eugene Kirpichov <ek...@gmail.com> wrote:
> >>>
> >>> +Daniel as in charge of 2.24 per dev@ thread.
> >>>
> >>> On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov <ek...@gmail.com> wrote:
> >>>>
> >>>> The PR is merged.
> >>>>
> >>>> Do folks think this warrants being cherrypicked into v2.24? My hunch is yes, cause basically one of the runners (local portable python runner) is broken for any production workload (works only if your pipeline has no dependencies).
> >>>>
> >>>> On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <ek...@gmail.com> wrote:
> >>>>>
> >>>>> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571
> >>>>>
> >>>>> However, I'm not up to date on the portable test infrastructure and would appreciate guidance on what tests I can add for this.
> >>>>>
> >>>>> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ek...@gmail.com> wrote:
> >>>>>>
> >>>>>> (FYI Sam +sbrother@gmail.com)
> >>>>>>
> >>>>>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ek...@gmail.com> wrote:
> >>>>>>>
> >>>>>>> Ok I found the bug, and now I don't understand how it could have possibly ever worked. And if this was never tested, then I don't understand why it works after fixing this one bug :)
> >>>>>>>
> >>>>>>> Basically the Python ArtifactStaging/RetrievalService uses FileSystems.open() to read the artifacts to be staged, and FileSystems.open() by default decompresses compressed files based on their extension.
> >>>>>>> I found two of such services - in Python and in Java. Is the Python used with an embedded job endpoint and the java one otherwise? I haven't inspected the Java one, but fixing Python does the trick.
> >>>>>>>
> >>>>>>> The fix is this patch:
> >>>>>>>
> >>>>>>> diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py
> >>>>>>> index f2bbf534c3..1f3ec1c0b0 100644
> >>>>>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
> >>>>>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
> >>>>>>> @@ -41,6 +41,7 @@ import grpc
> >>>>>>>  from future.moves.urllib.request import urlopen
> >>>>>>>
> >>>>>>>  from apache_beam.io import filesystems
> >>>>>>> +from apache_beam.io.filesystems import CompressionTypes
> >>>>>>>  from apache_beam.portability import common_urns
> >>>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2
> >>>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
> >>>>>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
> >>>>>>>      self._root = root
> >>>>>>>
> >>>>>>>    def file_reader(self, path):
> >>>>>>> -    return filesystems.FileSystems.open(path)
> >>>>>>> +    return filesystems.FileSystems.open(
> >>>>>>> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
> >>>>>>>
> >>>>>>>    def file_writer(self, name=None):
> >>>>>>>      full_path = filesystems.FileSystems.join(self._root, name)
> >>>>>>> diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> >>>>>>> index 5bf3282250..2684235be0 100644
> >>>>>>> --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> >>>>>>> +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> >>>>>>> @@ -45,6 +45,7 @@ from typing import overload
> >>>>>>>  import grpc
> >>>>>>>
> >>>>>>>  from apache_beam.io import filesystems
> >>>>>>> +from apache_beam.io.filesystems import CompressionTypes
> >>>>>>>  from apache_beam.portability import common_urns
> >>>>>>>  from apache_beam.portability import python_urns
> >>>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
> >>>>>>> @@ -464,9 +465,13 @@ class GrpcServer(object):
> >>>>>>>                  self.provision_info.provision_info, worker_manager),
> >>>>>>>              self.control_server)
> >>>>>>>
> >>>>>>> +      def open_uncompressed(f):
> >>>>>>> +        return filesystems.FileSystems.open(
> >>>>>>> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
> >>>>>>> +
> >>>>>>>        beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
> >>>>>>>            artifact_service.ArtifactRetrievalService(
> >>>>>>> -              file_reader=filesystems.FileSystems.open),
> >>>>>>> +              file_reader=open_uncompressed),
> >>>>>>>            self.control_server)
> >>>>>>>
> >>>>>>>      self.data_plane_handler = data_plane.BeamFnDataServicer(
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ek...@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>> Hi Maximilian,
> >>>>>>>>
> >>>>>>>> Thank you - it works fine with the embedded Flink runner (per below, seems like it's not using Docker for running Python code? What is it using then?).
> >>>>>>>>
> >>>>>>>> However, the original bug appears to be wider than I thought - it is also present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems like something is very broken in local Docker execution in general - I haven't yet verified whether the same error will happen when running on a remote Flink cluster.
> >>>>>>>>
> >>>>>>>> Trying to build my own SDK containers with some more debugging so I can figure out what's going on...
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <mx...@apache.org> wrote:
> >>>>>>>>>
> >>>>>>>>> Looks like you ran into a bug.
> >>>>>>>>>
> >>>>>>>>> You could just run your program without specifying any arguments, since
> >>>>>>>>> running with Python's FnApiRunner should be enough.
> >>>>>>>>>
> >>>>>>>>> Alternatively, how about trying to run the same pipeline with the
> >>>>>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint.
> >>>>>>>>> It will run the Python code embedded (loopback environment) without
> >>>>>>>>> additional containers.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Max
> >>>>>>>>>
> >>>>>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
> >>>>>>>>> > Thanks Valentyn!
> >>>>>>>>> >
> >>>>>>>>> > Good to know that this is a bug (I'll file a bug), and that Dataflow has
> >>>>>>>>> > an experimental way to use custom containers. I'll try that.
> >>>>>>>>> >
> >>>>>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
> >>>>>>>>> > <valentyn@google.com <ma...@google.com>> wrote:
> >>>>>>>>> >
> >>>>>>>>> >     Hi Eugene,
> >>>>>>>>> >
> >>>>>>>>> >     Good to hear from you. The experience you are describing on Portable
> >>>>>>>>> >     Runner + Docker container in local execution mode is most certainly
> >>>>>>>>> >     a bug, if you have not opened an issue on it, please do so and feel
> >>>>>>>>> >     free to cc me.
> >>>>>>>>> >
> >>>>>>>>> >     I can also reproduce the bug and likewise didn't see anything
> >>>>>>>>> >     obvious immediately, this needs some debugging.
> >>>>>>>>> >
> >>>>>>>>> >     cc: +Ankur Goenka <ma...@google.com> +Kyle Weaver
> >>>>>>>>> >     <ma...@google.com> who recently worked on Portable Runner
> >>>>>>>>> >     and may be interested.
> >>>>>>>>> >
> >>>>>>>>> >     By the way, you should be able to use custom containers with
> >>>>>>>>> >     Dataflow, if you set --experiments=use_runner_v2.
> >>>>>>>>> >
> >>>>>>>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
> >>>>>>>>> >     <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
> >>>>>>>>> >
> >>>>>>>>> >         (cc'ing Sam with whom I'm working on this atm)
> >>>>>>>>> >
> >>>>>>>>> >         FWIW I'm still stumped. I've looked through Python, Go and Java
> >>>>>>>>> >         code in the Beam repo having anything to do with
> >>>>>>>>> >         gzipping/unzipping, and none of it appears to be used in the
> >>>>>>>>> >         artifact staging/retrieval codepaths. I also can't find any
> >>>>>>>>> >         mention of compression/decompression in the container boot code.
> >>>>>>>>> >         My next step will be to add a bunch of debugging, rebuild the
> >>>>>>>>> >         containers, and see what the artifact services think they're
> >>>>>>>>> >         serving.
> >>>>>>>>> >
> >>>>>>>>> >
> >>>>>>>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
> >>>>>>>>> >         <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
> >>>>>>>>> >
> >>>>>>>>> >             Thanks Austin! Good stuff - though note that I am
> >>>>>>>>> >             /not/ using custom containers, I'm just trying to get the
> >>>>>>>>> >             basic stuff to work, a Python pipeline with a simple
> >>>>>>>>> >             requirements.txt file. Feels like this should work
> >>>>>>>>> >             out-of-the-box, I must be doing something wrong.
> >>>>>>>>> >
> >>>>>>>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
> >>>>>>>>> >             <whatwouldaustindo@gmail.com
> >>>>>>>>> >             <ma...@gmail.com>> wrote:
> >>>>>>>>> >
> >>>>>>>>> >                 I only believe @OrielResearch Eila Arich-Landkof
> >>>>>>>>> >                 <ma...@orielresearch.org> potentially doing
> >>>>>>>>> >                 applied work with custom containers (there must be others)!
> >>>>>>>>> >
> >>>>>>>>> >                 For a plug for her and @BeamSummit --  I think enough
> >>>>>>>>> >                 related will be talked about in (with Conda specifics)
> >>>>>>>>> >                 -->
> >>>>>>>>> >                 https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
> >>>>>>>>> >
> >>>>>>>>> >                 I'm sure others will have more things to say that are
> >>>>>>>>> >                 actually helpful, on-list, before that occurs (~3 weeks).
> >>>>>>>>> >
> >>>>>>>>> >
> >>>>>>>>> >
> >>>>>>>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
> >>>>>>>>> >                 <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
> >>>>>>>>> >
> >>>>>>>>> >                     Hi old Beam friends,
> >>>>>>>>> >
> >>>>>>>>> >                     I left Google to work on climate change
> >>>>>>>>> >                     <https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U>
> >>>>>>>>> >                     and am now doing a short engagement with Pachama
> >>>>>>>>> >                     <https://pachama.com/>. Right now I'm trying to get
> >>>>>>>>> >                     a Beam Python pipeline to work; the pipeline will
> >>>>>>>>> >                     use fancy requirements and native dependencies, and
> >>>>>>>>> >                     we plan to run it on Cloud Dataflow (so custom
> >>>>>>>>> >                     containers are not yet an option), so I'm going
> >>>>>>>>> >                     straight for the direct PortableRunner as per
> >>>>>>>>> >                     https://beam.apache.org/documentation/runtime/environments/.
> >>>>>>>>> >
> >>>>>>>>> >                     Basically I can't get a minimal Beam program with a
> >>>>>>>>> >                     minimal requirements.txt file to work - the .tar.gz
> >>>>>>>>> >                     of the dependency mysteriously ends up being
> >>>>>>>>> >                     ungzipped and non-installable inside the Docker
> >>>>>>>>> >                     container running the worker. Details below.
> >>>>>>>>> >
> >>>>>>>>> >                     === main.py ===
> >>>>>>>>> >                     import argparse
> >>>>>>>>> >                     import logging
> >>>>>>>>> >
> >>>>>>>>> >                     import apache_beam as beam
> >>>>>>>>> >                     from apache_beam.options.pipeline_options import
> >>>>>>>>> >                     PipelineOptions
> >>>>>>>>> >                     from apache_beam.options.pipeline_options import
> >>>>>>>>> >                     SetupOptions
> >>>>>>>>> >
> >>>>>>>>> >                     def run(argv=None):
> >>>>>>>>> >                          parser = argparse.ArgumentParser()
> >>>>>>>>> >                          known_args, pipeline_args =
> >>>>>>>>> >                     parser.parse_known_args(argv)
> >>>>>>>>> >
> >>>>>>>>> >                          pipeline_options = PipelineOptions(pipeline_args)
> >>>>>>>>> >
> >>>>>>>>> >                     pipeline_options.view_as(SetupOptions).save_main_session
> >>>>>>>>> >                     = True
> >>>>>>>>> >
> >>>>>>>>> >                          with beam.Pipeline(options=pipeline_options) as p:
> >>>>>>>>> >                              (p | 'Create' >> beam.Create(['Hello'])
> >>>>>>>>> >                                 | 'Write' >> beam.io.WriteToText('/tmp'))
> >>>>>>>>> >
> >>>>>>>>> >
> >>>>>>>>> >                     if __name__ == '__main__':
> >>>>>>>>> >                          logging.getLogger().setLevel(logging.INFO)
> >>>>>>>>> >                          run()
> >>>>>>>>> >
> >>>>>>>>> >                     === requirements.txt ===
> >>>>>>>>> >                     alembic
> >>>>>>>>> >
> >>>>>>>>> >                     When I run the program:
> >>>>>>>>> >                     $ python3 main.py
> >>>>>>>>> >                     --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
> >>>>>>>>> >
> >>>>>>>>> >
> >>>>>>>>> >                     I get some normal output and then:
> >>>>>>>>> >
> >>>>>>>>> >                     INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
> >>>>>>>>> >                       File
> >>>>>>>>> >                     "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
> >>>>>>>>> >                     line 261, in unpack_file\n    untar_file(filename,
> >>>>>>>>> >                     location)\n  File
> >>>>>>>>> >                     "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
> >>>>>>>>> >                     line 177, in untar_file\n    tar =
> >>>>>>>>> >                     tarfile.open(filename, mode)\n  File
> >>>>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1591, in
> >>>>>>>>> >                     open\n    return func(name, filemode, fileobj,
> >>>>>>>>> >                     **kwargs)\n  File
> >>>>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1648, in
> >>>>>>>>> >                     gzopen\n    raise ReadError("not a gzip
> >>>>>>>>> >                     file")\ntarfile.ReadError: not a gzip
> >>>>>>>>> >                     file\n2020/08/08 01:17:07 Failed to install required
> >>>>>>>>> >                     packages: failed to install requirements: exit
> >>>>>>>>> >                     status 2\n'
> >>>>>>>>> >
> >>>>>>>>> >                     This greatly puzzled me and, after some looking, I
> >>>>>>>>> >                     found something really surprising. Here is the
> >>>>>>>>> >                     package in the /directory to be staged/:
> >>>>>>>>> >
> >>>>>>>>> >                     $ file
> >>>>>>>>> >                     /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
> >>>>>>>>> >                     ...: gzip compressed data, was
> >>>>>>>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu Mar 19
> >>>>>>>>> >                     21:48:31 2020, max compression, original size modulo
> >>>>>>>>> >                     2^32 4730880
> >>>>>>>>> >                     $ ls -l
> >>>>>>>>> >                     /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
> >>>>>>>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...
> >>>>>>>>> >
> >>>>>>>>> >                     So far so good. But here is the same file inside the
> >>>>>>>>> >                     Docker container (I ssh'd into the dead container
> >>>>>>>>> >                     <https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers>):
> >>>>>>>>> >
> >>>>>>>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
> >>>>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive
> >>>>>>>>> >                     (GNU)
> >>>>>>>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
> >>>>>>>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
> >>>>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
> >>>>>>>>> >
> >>>>>>>>> >                     The file has clearly been unzipped and now of course
> >>>>>>>>> >                     pip can't install it! What's going on here? Am I
> >>>>>>>>> >                     using the direct/portable runner combination wrong?
> >>>>>>>>> >
> >>>>>>>>> >                     Thanks!
> >>>>>>>>> >
> >>>>>>>>> >                     --
> >>>>>>>>> >                     Eugene Kirpichov
> >>>>>>>>> >                     http://www.linkedin.com/in/eugenekirpichov
> >>>>>>>>> >
> >>>>>>>>> >
> >>>>>>>>> >
> >>>>>>>>> >             --
> >>>>>>>>> >             Eugene Kirpichov
> >>>>>>>>> >             http://www.linkedin.com/in/eugenekirpichov
> >>>>>>>>> >
> >>>>>>>>> >
> >>>>>>>>> >
> >>>>>>>>> >         --
> >>>>>>>>> >         Eugene Kirpichov
> >>>>>>>>> >         http://www.linkedin.com/in/eugenekirpichov
> >>>>>>>>> >
> >>>>>>>>> >
> >>>>>>>>> >
> >>>>>>>>> > --
> >>>>>>>>> > Eugene Kirpichov
> >>>>>>>>> > http://www.linkedin.com/in/eugenekirpichov
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Eugene Kirpichov
> >>>>>>>> http://www.linkedin.com/in/eugenekirpichov
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Eugene Kirpichov
> >>>>>>> http://www.linkedin.com/in/eugenekirpichov
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Eugene Kirpichov
> >>>>>> http://www.linkedin.com/in/eugenekirpichov
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Eugene Kirpichov
> >>>>> http://www.linkedin.com/in/eugenekirpichov
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Eugene Kirpichov
> >>>> http://www.linkedin.com/in/eugenekirpichov
> >>>
> >>>
> >>>
> >>> --
> >>> Eugene Kirpichov
> >>> http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Robert Bradshaw <ro...@google.com>.
+1

Thanks, Eugene, for finding and fixing this!

FWIW, most use of Python from the Python Portable Runner used the
embedded environment (this is the default direct runner), so
dependencies are already present.

On Mon, Aug 17, 2020 at 3:19 PM Daniel Oliveira <da...@google.com> wrote:
>
> Normally I'd say not to cherry-pick this since the issue is only affecting one runner and isn't really a regression, but given that it's the last Py2 release and there won't be a follow-up release that will be able to include this fix, I think it's worth making an exception this time. There should be at least one release with a working portable runner for Py2 users, given that the portable runner is included in examples on the website. Plus, I'm already waiting for some other cherry-picks, so it won't even delay anything.
>
> So yes, Eugene if you could create a cherry-pick of this change into the release-2.24.0 branch, I'll review and merge it.
>
> On Mon, Aug 17, 2020 at 11:27 AM Valentyn Tymofieiev <va...@google.com> wrote:
>>
>> Will defer to the release manager; one reason to cherry-pick is that 2.24.0 will be the last release with Python 2 support, so Py2 users of Portable Python Local Runner might appreciate the fix, since they won't be able to use the next release.
>>
>> On Thu, Aug 13, 2020 at 6:28 PM Eugene Kirpichov <ek...@gmail.com> wrote:
>>>
>>> +Daniel as in charge of 2.24 per dev@ thread.
>>>
>>> On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov <ek...@gmail.com> wrote:
>>>>
>>>> The PR is merged.
>>>>
>>>> Do folks think this warrants being cherrypicked into v2.24? My hunch is yes, cause basically one of the runners (local portable python runner) is broken for any production workload (works only if your pipeline has no dependencies).
>>>>
>>>> On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <ek...@gmail.com> wrote:
>>>>>
>>>>> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571
>>>>>
>>>>> However, I'm not up to date on the portable test infrastructure and would appreciate guidance on what tests I can add for this.
>>>>>
>>>>> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ek...@gmail.com> wrote:
>>>>>>
>>>>>> (FYI Sam +sbrother@gmail.com)
>>>>>>
>>>>>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ek...@gmail.com> wrote:
>>>>>>>
>>>>>>> Ok I found the bug, and now I don't understand how it could have possibly ever worked. And if this was never tested, then I don't understand why it works after fixing this one bug :)
>>>>>>>
>>>>>>> Basically the Python ArtifactStaging/RetrievalService uses FileSystems.open() to read the artifacts to be staged, and FileSystems.open() by default decompresses compressed files based on their extension.
>>>>>>> I found two of such services - in Python and in Java. Is the Python used with an embedded job endpoint and the java one otherwise? I haven't inspected the Java one, but fixing Python does the trick.
>>>>>>>
>>>>>>> The fix is this patch:
>>>>>>>
>>>>>>> diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>>>> index f2bbf534c3..1f3ec1c0b0 100644
>>>>>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>>>> @@ -41,6 +41,7 @@ import grpc
>>>>>>>  from future.moves.urllib.request import urlopen
>>>>>>>
>>>>>>>  from apache_beam.io import filesystems
>>>>>>> +from apache_beam.io.filesystems import CompressionTypes
>>>>>>>  from apache_beam.portability import common_urns
>>>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2
>>>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>>>>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>>>>>>>      self._root = root
>>>>>>>
>>>>>>>    def file_reader(self, path):
>>>>>>> -    return filesystems.FileSystems.open(path)
>>>>>>> +    return filesystems.FileSystems.open(
>>>>>>> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
>>>>>>>
>>>>>>>    def file_writer(self, name=None):
>>>>>>>      full_path = filesystems.FileSystems.join(self._root, name)
>>>>>>> diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>>>> index 5bf3282250..2684235be0 100644
>>>>>>> --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>>>> +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>>>> @@ -45,6 +45,7 @@ from typing import overload
>>>>>>>  import grpc
>>>>>>>
>>>>>>>  from apache_beam.io import filesystems
>>>>>>> +from apache_beam.io.filesystems import CompressionTypes
>>>>>>>  from apache_beam.portability import common_urns
>>>>>>>  from apache_beam.portability import python_urns
>>>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>>>>>> @@ -464,9 +465,13 @@ class GrpcServer(object):
>>>>>>>                  self.provision_info.provision_info, worker_manager),
>>>>>>>              self.control_server)
>>>>>>>
>>>>>>> +      def open_uncompressed(f):
>>>>>>> +        return filesystems.FileSystems.open(
>>>>>>> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
>>>>>>> +
>>>>>>>        beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
>>>>>>>            artifact_service.ArtifactRetrievalService(
>>>>>>> -              file_reader=filesystems.FileSystems.open),
>>>>>>> +              file_reader=open_uncompressed),
>>>>>>>            self.control_server)
>>>>>>>
>>>>>>>      self.data_plane_handler = data_plane.BeamFnDataServicer(
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ek...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Hi Maximilian,
>>>>>>>>
>>>>>>>> Thank you - it works fine with the embedded Flink runner (per below, seems like it's not using Docker for running Python code? What is it using then?).
>>>>>>>>
>>>>>>>> However, the original bug appears to be wider than I thought - it is also present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems like something is very broken in local Docker execution in general - I haven't yet verified whether the same error will happen when running on a remote Flink cluster.
>>>>>>>>
>>>>>>>> Trying to build my own SDK containers with some more debugging so I can figure out what's going on...
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <mx...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>> Looks like you ran into a bug.
>>>>>>>>>
>>>>>>>>> You could just run your program without specifying any arguments, since
>>>>>>>>> running with Python's FnApiRunner should be enough.
>>>>>>>>>
>>>>>>>>> Alternatively, how about trying to run the same pipeline with the
>>>>>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint.
>>>>>>>>> It will run the Python code embedded (loopback environment) without
>>>>>>>>> additional containers.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Max
>>>>>>>>>
>>>>>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>>>>>>>>> > Thanks Valentyn!
>>>>>>>>> >
>>>>>>>>> > Good to know that this is a bug (I'll file a bug), and that Dataflow has
>>>>>>>>> > an experimental way to use custom containers. I'll try that.
>>>>>>>>> >
>>>>>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>>>>>>>>> > <valentyn@google.com <ma...@google.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >     Hi Eugene,
>>>>>>>>> >
>>>>>>>>> >     Good to hear from you. The experience you are describing on Portable
>>>>>>>>> >     Runner + Docker container in local execution mode is most certainly
>>>>>>>>> >     a bug, if you have not opened an issue on it, please do so and feel
>>>>>>>>> >     free to cc me.
>>>>>>>>> >
>>>>>>>>> >     I can also reproduce the bug and likewise didn't see anything
>>>>>>>>> >     obvious immediately, this needs some debugging.
>>>>>>>>> >
>>>>>>>>> >     cc: +Ankur Goenka <ma...@google.com> +Kyle Weaver
>>>>>>>>> >     <ma...@google.com> who recently worked on Portable Runner
>>>>>>>>> >     and may be interested.
>>>>>>>>> >
>>>>>>>>> >     By the way, you should be able to use custom containers with
>>>>>>>>> >     Dataflow, if you set --experiments=use_runner_v2.
>>>>>>>>> >
>>>>>>>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>>>>>>>>> >     <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >         (cc'ing Sam with whom I'm working on this atm)
>>>>>>>>> >
>>>>>>>>> >         FWIW I'm still stumped. I've looked through Python, Go and Java
>>>>>>>>> >         code in the Beam repo having anything to do with
>>>>>>>>> >         gzipping/unzipping, and none of it appears to be used in the
>>>>>>>>> >         artifact staging/retrieval codepaths. I also can't find any
>>>>>>>>> >         mention of compression/decompression in the container boot code.
>>>>>>>>> >         My next step will be to add a bunch of debugging, rebuild the
>>>>>>>>> >         containers, and see what the artifact services think they're
>>>>>>>>> >         serving.
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>>>>>>>>> >         <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >             Thanks Austin! Good stuff - though note that I am
>>>>>>>>> >             /not/ using custom containers, I'm just trying to get the
>>>>>>>>> >             basic stuff to work, a Python pipeline with a simple
>>>>>>>>> >             requirements.txt file. Feels like this should work
>>>>>>>>> >             out-of-the-box, I must be doing something wrong.
>>>>>>>>> >
>>>>>>>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>>>>>>>>> >             <whatwouldaustindo@gmail.com
>>>>>>>>> >             <ma...@gmail.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >                 I only believe @OrielResearch Eila Arich-Landkof
>>>>>>>>> >                 <ma...@orielresearch.org> potentially doing
>>>>>>>>> >                 applied work with custom containers (there must be others)!
>>>>>>>>> >
>>>>>>>>> >                 For a plug for her and @BeamSummit --  I think enough
>>>>>>>>> >                 related will be talked about in (with Conda specifics)
>>>>>>>>> >                 -->
>>>>>>>>> >                 https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>>>>>>>> >
>>>>>>>>> >                 I'm sure others will have more things to say that are
>>>>>>>>> >                 actually helpful, on-list, before that occurs (~3 weeks).
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>>>>>>>>> >                 <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >                     Hi old Beam friends,
>>>>>>>>> >
>>>>>>>>> >                     I left Google to work on climate change
>>>>>>>>> >                     <https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U>
>>>>>>>>> >                     and am now doing a short engagement with Pachama
>>>>>>>>> >                     <https://pachama.com/>. Right now I'm trying to get
>>>>>>>>> >                     a Beam Python pipeline to work; the pipeline will
>>>>>>>>> >                     use fancy requirements and native dependencies, and
>>>>>>>>> >                     we plan to run it on Cloud Dataflow (so custom
>>>>>>>>> >                     containers are not yet an option), so I'm going
>>>>>>>>> >                     straight for the direct PortableRunner as per
>>>>>>>>> >                     https://beam.apache.org/documentation/runtime/environments/.
>>>>>>>>> >
>>>>>>>>> >                     Basically I can't get a minimal Beam program with a
>>>>>>>>> >                     minimal requirements.txt file to work - the .tar.gz
>>>>>>>>> >                     of the dependency mysteriously ends up being
>>>>>>>>> >                     ungzipped and non-installable inside the Docker
>>>>>>>>> >                     container running the worker. Details below.
>>>>>>>>> >
>>>>>>>>> >                     === main.py ===
>>>>>>>>> >                     import argparse
>>>>>>>>> >                     import logging
>>>>>>>>> >
>>>>>>>>> >                     import apache_beam as beam
>>>>>>>>> >                     from apache_beam.options.pipeline_options import
>>>>>>>>> >                     PipelineOptions
>>>>>>>>> >                     from apache_beam.options.pipeline_options import
>>>>>>>>> >                     SetupOptions
>>>>>>>>> >
>>>>>>>>> >                     def run(argv=None):
>>>>>>>>> >                          parser = argparse.ArgumentParser()
>>>>>>>>> >                          known_args, pipeline_args =
>>>>>>>>> >                     parser.parse_known_args(argv)
>>>>>>>>> >
>>>>>>>>> >                          pipeline_options = PipelineOptions(pipeline_args)
>>>>>>>>> >
>>>>>>>>> >                     pipeline_options.view_as(SetupOptions).save_main_session
>>>>>>>>> >                     = True
>>>>>>>>> >
>>>>>>>>> >                          with beam.Pipeline(options=pipeline_options) as p:
>>>>>>>>> >                              (p | 'Create' >> beam.Create(['Hello'])
>>>>>>>>> >                                 | 'Write' >> beam.io.WriteToText('/tmp'))
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >                     if __name__ == '__main__':
>>>>>>>>> >                          logging.getLogger().setLevel(logging.INFO)
>>>>>>>>> >                          run()
>>>>>>>>> >
>>>>>>>>> >                     === requirements.txt ===
>>>>>>>>> >                     alembic
>>>>>>>>> >
>>>>>>>>> >                     When I run the program:
>>>>>>>>> >                     $ python3 main.py
>>>>>>>>> >                     --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >                     I get some normal output and then:
>>>>>>>>> >
>>>>>>>>> >                     INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>>>>>>> >                       File
>>>>>>>>> >                     "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>>>>>> >                     line 261, in unpack_file\n    untar_file(filename,
>>>>>>>>> >                     location)\n  File
>>>>>>>>> >                     "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>>>>>> >                     line 177, in untar_file\n    tar =
>>>>>>>>> >                     tarfile.open(filename, mode)\n  File
>>>>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1591, in
>>>>>>>>> >                     open\n    return func(name, filemode, fileobj,
>>>>>>>>> >                     **kwargs)\n  File
>>>>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1648, in
>>>>>>>>> >                     gzopen\n    raise ReadError("not a gzip
>>>>>>>>> >                     file")\ntarfile.ReadError: not a gzip
>>>>>>>>> >                     file\n2020/08/08 01:17:07 Failed to install required
>>>>>>>>> >                     packages: failed to install requirements: exit
>>>>>>>>> >                     status 2\n'
>>>>>>>>> >
>>>>>>>>> >                     This greatly puzzled me and, after some looking, I
>>>>>>>>> >                     found something really surprising. Here is the
>>>>>>>>> >                     package in the /directory to be staged/:
>>>>>>>>> >
>>>>>>>>> >                     $ file
>>>>>>>>> >                     /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>>>>>> >                     ...: gzip compressed data, was
>>>>>>>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu Mar 19
>>>>>>>>> >                     21:48:31 2020, max compression, original size modulo
>>>>>>>>> >                     2^32 4730880
>>>>>>>>> >                     $ ls -l
>>>>>>>>> >                     /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>>>>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...
>>>>>>>>> >
>>>>>>>>> >                     So far so good. But here is the same file inside the
>>>>>>>>> >                     Docker container (I ssh'd into the dead container
>>>>>>>>> >                     <https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers>):
>>>>>>>>> >
>>>>>>>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>>>>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive
>>>>>>>>> >                     (GNU)
>>>>>>>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>>>>>>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>>>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>>>>>>>>> >
>>>>>>>>> >                     The file has clearly been unzipped and now of course
>>>>>>>>> >                     pip can't install it! What's going on here? Am I
>>>>>>>>> >                     using the direct/portable runner combination wrong?
>>>>>>>>> >
>>>>>>>>> >                     Thanks!
>>>>>>>>> >
>>>>>>>>> >                     --
>>>>>>>>> >                     Eugene Kirpichov
>>>>>>>>> >                     http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >             --
>>>>>>>>> >             Eugene Kirpichov
>>>>>>>>> >             http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >         --
>>>>>>>>> >         Eugene Kirpichov
>>>>>>>>> >         http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > --
>>>>>>>>> > Eugene Kirpichov
>>>>>>>>> > http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Eugene Kirpichov
>>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Eugene Kirpichov
>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eugene Kirpichov
>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Daniel Oliveira <da...@google.com>.
Normally I'd say not to cherry-pick this since the issue is only affecting
one runner and isn't really a regression, but given that it's the last Py2
release and there won't be a follow-up release that will be able to include
this fix, I think it's worth making an exception this time. There should be
at least one release with a working portable runner for Py2 users, given
that the portable runner is included in examples on the website. Plus, I'm
already waiting for some other cherry-picks, so it won't even delay
anything.

So yes, Eugene if you could create a cherry-pick of this change into the
release-2.24.0 branch, I'll review and merge it.

On Mon, Aug 17, 2020 at 11:27 AM Valentyn Tymofieiev <va...@google.com>
wrote:

> Will defer to the release manager; one reason to cherry-pick is
> that 2.24.0 will be the last release with Python 2 support, so Py2 users of
> Portable Python Local Runner might appreciate the fix, since they won't be
> able to use the next release.
>
> On Thu, Aug 13, 2020 at 6:28 PM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> +Daniel as in charge of 2.24 per dev@ thread.
>>
>> On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> The PR is merged.
>>>
>>> Do folks think this warrants being cherrypicked into v2.24? My hunch is
>>> yes, cause basically one of the runners (local portable python runner) is
>>> broken for any production workload (works only if your pipeline has no
>>> dependencies).
>>>
>>> On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <ek...@gmail.com>
>>> wrote:
>>>
>>>> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571
>>>>
>>>> However, I'm not up to date on the portable test infrastructure and
>>>> would appreciate guidance on what tests I can add for this.
>>>>
>>>> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ek...@gmail.com>
>>>> wrote:
>>>>
>>>>> (FYI Sam +sbrother@gmail.com <sb...@gmail.com>)
>>>>>
>>>>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ek...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ok I found the bug, and now I don't understand how it could have
>>>>>> possibly ever worked. And if this was never tested, then I don't understand
>>>>>> why it works after fixing this one bug :)
>>>>>>
>>>>>> Basically the Python ArtifactStaging/RetrievalService uses
>>>>>> FileSystems.open() to read the artifacts to be staged, and
>>>>>> FileSystems.open() by default decompresses compressed files based on their
>>>>>> extension.
>>>>>> I found two of such services - in Python and in Java. Is the Python
>>>>>> used with an embedded job endpoint and the java one otherwise? I haven't
>>>>>> inspected the Java one, but fixing Python does the trick.
>>>>>>
>>>>>> The fix is this patch:
>>>>>>
>>>>>> diff --git
>>>>>> a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>>> b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>>> index f2bbf534c3..1f3ec1c0b0 100644
>>>>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>>> @@ -41,6 +41,7 @@ import grpc
>>>>>>  from future.moves.urllib.request import urlopen
>>>>>>
>>>>>>  from apache_beam.io import filesystems
>>>>>> +from apache_beam.io.filesystems import CompressionTypes
>>>>>>  from apache_beam.portability import common_urns
>>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2
>>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>>>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>>>>>>      self._root = root
>>>>>>
>>>>>>    def file_reader(self, path):
>>>>>> -    return filesystems.FileSystems.open(path)
>>>>>> +    return filesystems.FileSystems.open(
>>>>>> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
>>>>>>
>>>>>>    def file_writer(self, name=None):
>>>>>>      full_path = filesystems.FileSystems.join(self._root, name)
>>>>>> diff --git
>>>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>>> index 5bf3282250..2684235be0 100644
>>>>>> ---
>>>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>>> +++
>>>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>>> @@ -45,6 +45,7 @@ from typing import overload
>>>>>>  import grpc
>>>>>>
>>>>>>  from apache_beam.io import filesystems
>>>>>> +from apache_beam.io.filesystems import CompressionTypes
>>>>>>  from apache_beam.portability import common_urns
>>>>>>  from apache_beam.portability import python_urns
>>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>>>>> @@ -464,9 +465,13 @@ class GrpcServer(object):
>>>>>>                  self.provision_info.provision_info, worker_manager),
>>>>>>              self.control_server)
>>>>>>
>>>>>> +      def open_uncompressed(f):
>>>>>> +        return filesystems.FileSystems.open(
>>>>>> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
>>>>>> +
>>>>>>
>>>>>>  beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
>>>>>>            artifact_service.ArtifactRetrievalService(
>>>>>> -              file_reader=filesystems.FileSystems.open),
>>>>>> +              file_reader=open_uncompressed),
>>>>>>            self.control_server)
>>>>>>
>>>>>>      self.data_plane_handler = data_plane.BeamFnDataServicer(
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <
>>>>>> ekirpichov@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Maximilian,
>>>>>>>
>>>>>>> Thank you - it works fine with the embedded Flink runner (per below,
>>>>>>> seems like it's not using Docker for running Python code? What is it using
>>>>>>> then?).
>>>>>>>
>>>>>>> However, the original bug appears to be wider than I thought - it is
>>>>>>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems
>>>>>>> like something is very broken in local Docker execution in general - I
>>>>>>> haven't yet verified whether the same error will happen when running on a
>>>>>>> remote Flink cluster.
>>>>>>>
>>>>>>> Trying to build my own SDK containers with some more debugging so I
>>>>>>> can figure out what's going on...
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <mx...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Looks like you ran into a bug.
>>>>>>>>
>>>>>>>> You could just run your program without specifying any arguments,
>>>>>>>> since
>>>>>>>> running with Python's FnApiRunner should be enough.
>>>>>>>>
>>>>>>>> Alternatively, how about trying to run the same pipeline with the
>>>>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an
>>>>>>>> endpoint.
>>>>>>>> It will run the Python code embedded (loopback environment) without
>>>>>>>> additional containers.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Max
>>>>>>>>
>>>>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>>>>>>>> > Thanks Valentyn!
>>>>>>>> >
>>>>>>>> > Good to know that this is a bug (I'll file a bug), and that
>>>>>>>> Dataflow has
>>>>>>>> > an experimental way to use custom containers. I'll try that.
>>>>>>>> >
>>>>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>>>>>>>> > <valentyn@google.com <ma...@google.com>> wrote:
>>>>>>>> >
>>>>>>>> >     Hi Eugene,
>>>>>>>> >
>>>>>>>> >     Good to hear from you. The experience you are describing on
>>>>>>>> Portable
>>>>>>>> >     Runner + Docker container in local execution mode is most
>>>>>>>> certainly
>>>>>>>> >     a bug, if you have not opened an issue on it, please do so
>>>>>>>> and feel
>>>>>>>> >     free to cc me.
>>>>>>>> >
>>>>>>>> >     I can also reproduce the bug and likewise didn't see anything
>>>>>>>> >     obvious immediately, this needs some debugging.
>>>>>>>> >
>>>>>>>> >     cc: +Ankur Goenka <ma...@google.com> +Kyle Weaver
>>>>>>>> >     <ma...@google.com> who recently worked on Portable
>>>>>>>> Runner
>>>>>>>> >     and may be interested.
>>>>>>>> >
>>>>>>>> >     By the way, you should be able to use custom containers with
>>>>>>>> >     Dataflow, if you set --experiments=use_runner_v2.
>>>>>>>> >
>>>>>>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>>>>>>>> >     <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>>>>>>> >
>>>>>>>> >         (cc'ing Sam with whom I'm working on this atm)
>>>>>>>> >
>>>>>>>> >         FWIW I'm still stumped. I've looked through Python, Go
>>>>>>>> and Java
>>>>>>>> >         code in the Beam repo having anything to do with
>>>>>>>> >         gzipping/unzipping, and none of it appears to be used in
>>>>>>>> the
>>>>>>>> >         artifact staging/retrieval codepaths. I also can't find
>>>>>>>> any
>>>>>>>> >         mention of compression/decompression in the container
>>>>>>>> boot code.
>>>>>>>> >         My next step will be to add a bunch of debugging, rebuild
>>>>>>>> the
>>>>>>>> >         containers, and see what the artifact services think
>>>>>>>> they're
>>>>>>>> >         serving.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>>>>>>>> >         <ekirpichov@gmail.com <ma...@gmail.com>>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> >             Thanks Austin! Good stuff - though note that I am
>>>>>>>> >             /not/ using custom containers, I'm just trying to get
>>>>>>>> the
>>>>>>>> >             basic stuff to work, a Python pipeline with a simple
>>>>>>>> >             requirements.txt file. Feels like this should work
>>>>>>>> >             out-of-the-box, I must be doing something wrong.
>>>>>>>> >
>>>>>>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>>>>>>>> >             <whatwouldaustindo@gmail.com
>>>>>>>> >             <ma...@gmail.com>> wrote:
>>>>>>>> >
>>>>>>>> >                 I only believe @OrielResearch Eila Arich-Landkof
>>>>>>>> >                 <ma...@orielresearch.org> potentially doing
>>>>>>>> >                 applied work with custom containers (there must
>>>>>>>> be others)!
>>>>>>>> >
>>>>>>>> >                 For a plug for her and @BeamSummit --  I think
>>>>>>>> enough
>>>>>>>> >                 related will be talked about in (with Conda
>>>>>>>> specifics)
>>>>>>>> >                 -->
>>>>>>>> >
>>>>>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>>>>>>> >
>>>>>>>> >                 I'm sure others will have more things to say that
>>>>>>>> are
>>>>>>>> >                 actually helpful, on-list, before that occurs (~3
>>>>>>>> weeks).
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>>>>>>>> >                 <ekirpichov@gmail.com <mailto:
>>>>>>>> ekirpichov@gmail.com>> wrote:
>>>>>>>> >
>>>>>>>> >                     Hi old Beam friends,
>>>>>>>> >
>>>>>>>> >                     I left Google to work on climate change
>>>>>>>> >                     <
>>>>>>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>>>>>>>> >
>>>>>>>> >                     and am now doing a short engagement with
>>>>>>>> Pachama
>>>>>>>> >                     <https://pachama.com/>. Right now I'm trying
>>>>>>>> to get
>>>>>>>> >                     a Beam Python pipeline to work; the pipeline
>>>>>>>> will
>>>>>>>> >                     use fancy requirements and native
>>>>>>>> dependencies, and
>>>>>>>> >                     we plan to run it on Cloud Dataflow (so custom
>>>>>>>> >                     containers are not yet an option), so I'm
>>>>>>>> going
>>>>>>>> >                     straight for the direct PortableRunner as per
>>>>>>>> >
>>>>>>>> https://beam.apache.org/documentation/runtime/environments/.
>>>>>>>> >
>>>>>>>> >                     Basically I can't get a minimal Beam program
>>>>>>>> with a
>>>>>>>> >                     minimal requirements.txt file to work - the
>>>>>>>> .tar.gz
>>>>>>>> >                     of the dependency mysteriously ends up being
>>>>>>>> >                     ungzipped and non-installable inside the
>>>>>>>> Docker
>>>>>>>> >                     container running the worker. Details below.
>>>>>>>> >
>>>>>>>> >                     === main.py ===
>>>>>>>> >                     import argparse
>>>>>>>> >                     import logging
>>>>>>>> >
>>>>>>>> >                     import apache_beam as beam
>>>>>>>> >                     from apache_beam.options.pipeline_options
>>>>>>>> import
>>>>>>>> >                     PipelineOptions
>>>>>>>> >                     from apache_beam.options.pipeline_options
>>>>>>>> import
>>>>>>>> >                     SetupOptions
>>>>>>>> >
>>>>>>>> >                     def run(argv=None):
>>>>>>>> >                          parser = argparse.ArgumentParser()
>>>>>>>> >                          known_args, pipeline_args =
>>>>>>>> >                     parser.parse_known_args(argv)
>>>>>>>> >
>>>>>>>> >                          pipeline_options =
>>>>>>>> PipelineOptions(pipeline_args)
>>>>>>>> >
>>>>>>>> >
>>>>>>>>  pipeline_options.view_as(SetupOptions).save_main_session
>>>>>>>> >                     = True
>>>>>>>> >
>>>>>>>> >                          with
>>>>>>>> beam.Pipeline(options=pipeline_options) as p:
>>>>>>>> >                              (p | 'Create' >>
>>>>>>>> beam.Create(['Hello'])
>>>>>>>> >                                 | 'Write' >>
>>>>>>>> beam.io.WriteToText('/tmp'))
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >                     if __name__ == '__main__':
>>>>>>>> >
>>>>>>>> logging.getLogger().setLevel(logging.INFO)
>>>>>>>> >                          run()
>>>>>>>> >
>>>>>>>> >                     === requirements.txt ===
>>>>>>>> >                     alembic
>>>>>>>> >
>>>>>>>> >                     When I run the program:
>>>>>>>> >                     $ python3 main.py
>>>>>>>> >
>>>>>>>>  --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >                     I get some normal output and then:
>>>>>>>> >
>>>>>>>> >
>>>>>>>>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>>>>>> >                       File
>>>>>>>> >
>>>>>>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>>>>> >                     line 261, in unpack_file\n
>>>>>>>>  untar_file(filename,
>>>>>>>> >                     location)\n  File
>>>>>>>> >
>>>>>>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>>>>> >                     line 177, in untar_file\n    tar =
>>>>>>>> >                     tarfile.open(filename, mode)\n  File
>>>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>>>>> 1591, in
>>>>>>>> >                     open\n    return func(name, filemode, fileobj,
>>>>>>>> >                     **kwargs)\n  File
>>>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>>>>> 1648, in
>>>>>>>> >                     gzopen\n    raise ReadError("not a gzip
>>>>>>>> >                     file")\ntarfile.ReadError: not a gzip
>>>>>>>> >                     file\n2020/08/08 01:17:07 Failed to install
>>>>>>>> required
>>>>>>>> >                     packages: failed to install requirements: exit
>>>>>>>> >                     status 2\n'
>>>>>>>> >
>>>>>>>> >                     This greatly puzzled me and, after some
>>>>>>>> looking, I
>>>>>>>> >                     found something really surprising. Here is the
>>>>>>>> >                     package in the /directory to be staged/:
>>>>>>>> >
>>>>>>>> >                     $ file
>>>>>>>> >
>>>>>>>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>>>>> >                     ...: gzip compressed data, was
>>>>>>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu
>>>>>>>> Mar 19
>>>>>>>> >                     21:48:31 2020, max compression, original size
>>>>>>>> modulo
>>>>>>>> >                     2^32 4730880
>>>>>>>> >                     $ ls -l
>>>>>>>> >
>>>>>>>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>>>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7
>>>>>>>> 16:56 ...
>>>>>>>> >
>>>>>>>> >                     So far so good. But here is the same file
>>>>>>>> inside the
>>>>>>>> >                     Docker container (I ssh'd into the dead
>>>>>>>> container
>>>>>>>> >                     <
>>>>>>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>>>>>>>> >):
>>>>>>>> >
>>>>>>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>>>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar
>>>>>>>> archive
>>>>>>>> >                     (GNU)
>>>>>>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>>>>>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>>>>>>>> >
>>>>>>>> >                     The file has clearly been unzipped and now of
>>>>>>>> course
>>>>>>>> >                     pip can't install it! What's going on here?
>>>>>>>> Am I
>>>>>>>> >                     using the direct/portable runner combination
>>>>>>>> wrong?
>>>>>>>> >
>>>>>>>> >                     Thanks!
>>>>>>>> >
>>>>>>>> >                     --
>>>>>>>> >                     Eugene Kirpichov
>>>>>>>> >                     http://www.linkedin.com/in/eugenekirpichov
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >             --
>>>>>>>> >             Eugene Kirpichov
>>>>>>>> >             http://www.linkedin.com/in/eugenekirpichov
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >         --
>>>>>>>> >         Eugene Kirpichov
>>>>>>>> >         http://www.linkedin.com/in/eugenekirpichov
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > --
>>>>>>>> > Eugene Kirpichov
>>>>>>>> > http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Eugene Kirpichov
>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eugene Kirpichov
>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Valentyn Tymofieiev <va...@google.com>.
Will defer to the release manager; one reason to cherry-pick is that 2.24.0
will be the last release with Python 2 support, so Py2 users of Portable
Python Local Runner might appreciate the fix, since they won't be able to
use the next release.

On Thu, Aug 13, 2020 at 6:28 PM Eugene Kirpichov <ek...@gmail.com>
wrote:

> +Daniel as in charge of 2.24 per dev@ thread.
>
> On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> The PR is merged.
>>
>> Do folks think this warrants being cherrypicked into v2.24? My hunch is
>> yes, cause basically one of the runners (local portable python runner) is
>> broken for any production workload (works only if your pipeline has no
>> dependencies).
>>
>> On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571
>>>
>>> However, I'm not up to date on the portable test infrastructure and
>>> would appreciate guidance on what tests I can add for this.
>>>
>>> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ek...@gmail.com>
>>> wrote:
>>>
>>>> (FYI Sam +sbrother@gmail.com <sb...@gmail.com>)
>>>>
>>>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ek...@gmail.com>
>>>> wrote:
>>>>
>>>>> Ok I found the bug, and now I don't understand how it could have
>>>>> possibly ever worked. And if this was never tested, then I don't understand
>>>>> why it works after fixing this one bug :)
>>>>>
>>>>> Basically the Python ArtifactStaging/RetrievalService uses
>>>>> FileSystems.open() to read the artifacts to be staged, and
>>>>> FileSystems.open() by default decompresses compressed files based on their
>>>>> extension.
>>>>> I found two of such services - in Python and in Java. Is the Python
>>>>> used with an embedded job endpoint and the java one otherwise? I haven't
>>>>> inspected the Java one, but fixing Python does the trick.
>>>>>
>>>>> The fix is this patch:
>>>>>
>>>>> diff --git
>>>>> a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>> b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>> index f2bbf534c3..1f3ec1c0b0 100644
>>>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>> @@ -41,6 +41,7 @@ import grpc
>>>>>  from future.moves.urllib.request import urlopen
>>>>>
>>>>>  from apache_beam.io import filesystems
>>>>> +from apache_beam.io.filesystems import CompressionTypes
>>>>>  from apache_beam.portability import common_urns
>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2
>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>>>>>      self._root = root
>>>>>
>>>>>    def file_reader(self, path):
>>>>> -    return filesystems.FileSystems.open(path)
>>>>> +    return filesystems.FileSystems.open(
>>>>> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
>>>>>
>>>>>    def file_writer(self, name=None):
>>>>>      full_path = filesystems.FileSystems.join(self._root, name)
>>>>> diff --git
>>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>> index 5bf3282250..2684235be0 100644
>>>>> ---
>>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>> +++
>>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>> @@ -45,6 +45,7 @@ from typing import overload
>>>>>  import grpc
>>>>>
>>>>>  from apache_beam.io import filesystems
>>>>> +from apache_beam.io.filesystems import CompressionTypes
>>>>>  from apache_beam.portability import common_urns
>>>>>  from apache_beam.portability import python_urns
>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>>>> @@ -464,9 +465,13 @@ class GrpcServer(object):
>>>>>                  self.provision_info.provision_info, worker_manager),
>>>>>              self.control_server)
>>>>>
>>>>> +      def open_uncompressed(f):
>>>>> +        return filesystems.FileSystems.open(
>>>>> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
>>>>> +
>>>>>
>>>>>  beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
>>>>>            artifact_service.ArtifactRetrievalService(
>>>>> -              file_reader=filesystems.FileSystems.open),
>>>>> +              file_reader=open_uncompressed),
>>>>>            self.control_server)
>>>>>
>>>>>      self.data_plane_handler = data_plane.BeamFnDataServicer(
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ek...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Maximilian,
>>>>>>
>>>>>> Thank you - it works fine with the embedded Flink runner (per below,
>>>>>> seems like it's not using Docker for running Python code? What is it using
>>>>>> then?).
>>>>>>
>>>>>> However, the original bug appears to be wider than I thought - it is
>>>>>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems
>>>>>> like something is very broken in local Docker execution in general - I
>>>>>> haven't yet verified whether the same error will happen when running on a
>>>>>> remote Flink cluster.
>>>>>>
>>>>>> Trying to build my own SDK containers with some more debugging so I
>>>>>> can figure out what's going on...
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <mx...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Looks like you ran into a bug.
>>>>>>>
>>>>>>> You could just run your program without specifying any arguments,
>>>>>>> since
>>>>>>> running with Python's FnApiRunner should be enough.
>>>>>>>
>>>>>>> Alternatively, how about trying to run the same pipeline with the
>>>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an
>>>>>>> endpoint.
>>>>>>> It will run the Python code embedded (loopback environment) without
>>>>>>> additional containers.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>>
>>>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>>>>>>> > Thanks Valentyn!
>>>>>>> >
>>>>>>> > Good to know that this is a bug (I'll file a bug), and that
>>>>>>> Dataflow has
>>>>>>> > an experimental way to use custom containers. I'll try that.
>>>>>>> >
>>>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>>>>>>> > <valentyn@google.com <ma...@google.com>> wrote:
>>>>>>> >
>>>>>>> >     Hi Eugene,
>>>>>>> >
>>>>>>> >     Good to hear from you. The experience you are describing on
>>>>>>> Portable
>>>>>>> >     Runner + Docker container in local execution mode is most
>>>>>>> certainly
>>>>>>> >     a bug, if you have not opened an issue on it, please do so and
>>>>>>> feel
>>>>>>> >     free to cc me.
>>>>>>> >
>>>>>>> >     I can also reproduce the bug and likewise didn't see anything
>>>>>>> >     obvious immediately, this needs some debugging.
>>>>>>> >
>>>>>>> >     cc: +Ankur Goenka <ma...@google.com> +Kyle Weaver
>>>>>>> >     <ma...@google.com> who recently worked on Portable
>>>>>>> Runner
>>>>>>> >     and may be interested.
>>>>>>> >
>>>>>>> >     By the way, you should be able to use custom containers with
>>>>>>> >     Dataflow, if you set --experiments=use_runner_v2.
>>>>>>> >
>>>>>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>>>>>>> >     <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>>>>>> >
>>>>>>> >         (cc'ing Sam with whom I'm working on this atm)
>>>>>>> >
>>>>>>> >         FWIW I'm still stumped. I've looked through Python, Go and
>>>>>>> Java
>>>>>>> >         code in the Beam repo having anything to do with
>>>>>>> >         gzipping/unzipping, and none of it appears to be used in
>>>>>>> the
>>>>>>> >         artifact staging/retrieval codepaths. I also can't find any
>>>>>>> >         mention of compression/decompression in the container boot
>>>>>>> code.
>>>>>>> >         My next step will be to add a bunch of debugging, rebuild
>>>>>>> the
>>>>>>> >         containers, and see what the artifact services think
>>>>>>> they're
>>>>>>> >         serving.
>>>>>>> >
>>>>>>> >
>>>>>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>>>>>>> >         <ekirpichov@gmail.com <ma...@gmail.com>>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> >             Thanks Austin! Good stuff - though note that I am
>>>>>>> >             /not/ using custom containers, I'm just trying to get
>>>>>>> the
>>>>>>> >             basic stuff to work, a Python pipeline with a simple
>>>>>>> >             requirements.txt file. Feels like this should work
>>>>>>> >             out-of-the-box, I must be doing something wrong.
>>>>>>> >
>>>>>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>>>>>>> >             <whatwouldaustindo@gmail.com
>>>>>>> >             <ma...@gmail.com>> wrote:
>>>>>>> >
>>>>>>> >                 I only believe @OrielResearch Eila Arich-Landkof
>>>>>>> >                 <ma...@orielresearch.org> potentially doing
>>>>>>> >                 applied work with custom containers (there must be
>>>>>>> others)!
>>>>>>> >
>>>>>>> >                 For a plug for her and @BeamSummit --  I think
>>>>>>> enough
>>>>>>> >                 related will be talked about in (with Conda
>>>>>>> specifics)
>>>>>>> >                 -->
>>>>>>> >
>>>>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>>>>>> >
>>>>>>> >                 I'm sure others will have more things to say that
>>>>>>> are
>>>>>>> >                 actually helpful, on-list, before that occurs (~3
>>>>>>> weeks).
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>>>>>>> >                 <ekirpichov@gmail.com <ma...@gmail.com>>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> >                     Hi old Beam friends,
>>>>>>> >
>>>>>>> >                     I left Google to work on climate change
>>>>>>> >                     <
>>>>>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>>>>>>> >
>>>>>>> >                     and am now doing a short engagement with
>>>>>>> Pachama
>>>>>>> >                     <https://pachama.com/>. Right now I'm trying
>>>>>>> to get
>>>>>>> >                     a Beam Python pipeline to work; the pipeline
>>>>>>> will
>>>>>>> >                     use fancy requirements and native
>>>>>>> dependencies, and
>>>>>>> >                     we plan to run it on Cloud Dataflow (so custom
>>>>>>> >                     containers are not yet an option), so I'm going
>>>>>>> >                     straight for the direct PortableRunner as per
>>>>>>> >
>>>>>>> https://beam.apache.org/documentation/runtime/environments/.
>>>>>>> >
>>>>>>> >                     Basically I can't get a minimal Beam program
>>>>>>> with a
>>>>>>> >                     minimal requirements.txt file to work - the
>>>>>>> .tar.gz
>>>>>>> >                     of the dependency mysteriously ends up being
>>>>>>> >                     ungzipped and non-installable inside the Docker
>>>>>>> >                     container running the worker. Details below.
>>>>>>> >
>>>>>>> >                     === main.py ===
>>>>>>> >                     import argparse
>>>>>>> >                     import logging
>>>>>>> >
>>>>>>> >                     import apache_beam as beam
>>>>>>> >                     from apache_beam.options.pipeline_options
>>>>>>> import
>>>>>>> >                     PipelineOptions
>>>>>>> >                     from apache_beam.options.pipeline_options
>>>>>>> import
>>>>>>> >                     SetupOptions
>>>>>>> >
>>>>>>> >                     def run(argv=None):
>>>>>>> >                          parser = argparse.ArgumentParser()
>>>>>>> >                          known_args, pipeline_args =
>>>>>>> >                     parser.parse_known_args(argv)
>>>>>>> >
>>>>>>> >                          pipeline_options =
>>>>>>> PipelineOptions(pipeline_args)
>>>>>>> >
>>>>>>> >
>>>>>>>  pipeline_options.view_as(SetupOptions).save_main_session
>>>>>>> >                     = True
>>>>>>> >
>>>>>>> >                          with
>>>>>>> beam.Pipeline(options=pipeline_options) as p:
>>>>>>> >                              (p | 'Create' >>
>>>>>>> beam.Create(['Hello'])
>>>>>>> >                                 | 'Write' >>
>>>>>>> beam.io.WriteToText('/tmp'))
>>>>>>> >
>>>>>>> >
>>>>>>> >                     if __name__ == '__main__':
>>>>>>> >                          logging.getLogger().setLevel(logging.INFO)
>>>>>>> >                          run()
>>>>>>> >
>>>>>>> >                     === requirements.txt ===
>>>>>>> >                     alembic
>>>>>>> >
>>>>>>> >                     When I run the program:
>>>>>>> >                     $ python3 main.py
>>>>>>> >
>>>>>>>  --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>>>>>>> >
>>>>>>> >
>>>>>>> >                     I get some normal output and then:
>>>>>>> >
>>>>>>> >
>>>>>>>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>>>>> >                       File
>>>>>>> >
>>>>>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>>>> >                     line 261, in unpack_file\n
>>>>>>>  untar_file(filename,
>>>>>>> >                     location)\n  File
>>>>>>> >
>>>>>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>>>> >                     line 177, in untar_file\n    tar =
>>>>>>> >                     tarfile.open(filename, mode)\n  File
>>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>>>> 1591, in
>>>>>>> >                     open\n    return func(name, filemode, fileobj,
>>>>>>> >                     **kwargs)\n  File
>>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>>>> 1648, in
>>>>>>> >                     gzopen\n    raise ReadError("not a gzip
>>>>>>> >                     file")\ntarfile.ReadError: not a gzip
>>>>>>> >                     file\n2020/08/08 01:17:07 Failed to install
>>>>>>> required
>>>>>>> >                     packages: failed to install requirements: exit
>>>>>>> >                     status 2\n'
>>>>>>> >
>>>>>>> >                     This greatly puzzled me and, after some
>>>>>>> looking, I
>>>>>>> >                     found something really surprising. Here is the
>>>>>>> >                     package in the /directory to be staged/:
>>>>>>> >
>>>>>>> >                     $ file
>>>>>>> >
>>>>>>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>>>> >                     ...: gzip compressed data, was
>>>>>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu
>>>>>>> Mar 19
>>>>>>> >                     21:48:31 2020, max compression, original size
>>>>>>> modulo
>>>>>>> >                     2^32 4730880
>>>>>>> >                     $ ls -l
>>>>>>> >
>>>>>>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7
>>>>>>> 16:56 ...
>>>>>>> >
>>>>>>> >                     So far so good. But here is the same file
>>>>>>> inside the
>>>>>>> >                     Docker container (I ssh'd into the dead
>>>>>>> container
>>>>>>> >                     <
>>>>>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>>>>>>> >):
>>>>>>> >
>>>>>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar
>>>>>>> archive
>>>>>>> >                     (GNU)
>>>>>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>>>>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>>>>>>> >
>>>>>>> >                     The file has clearly been unzipped and now of
>>>>>>> course
>>>>>>> >                     pip can't install it! What's going on here? Am
>>>>>>> I
>>>>>>> >                     using the direct/portable runner combination
>>>>>>> wrong?
>>>>>>> >
>>>>>>> >                     Thanks!
>>>>>>> >
>>>>>>> >                     --
>>>>>>> >                     Eugene Kirpichov
>>>>>>> >                     http://www.linkedin.com/in/eugenekirpichov
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >             --
>>>>>>> >             Eugene Kirpichov
>>>>>>> >             http://www.linkedin.com/in/eugenekirpichov
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >         --
>>>>>>> >         Eugene Kirpichov
>>>>>>> >         http://www.linkedin.com/in/eugenekirpichov
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > --
>>>>>>> > Eugene Kirpichov
>>>>>>> > http://www.linkedin.com/in/eugenekirpichov
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eugene Kirpichov
>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Eugene Kirpichov <ek...@gmail.com>.
+Daniel as in charge of 2.24 per dev@ thread.

On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov <ek...@gmail.com>
wrote:

> The PR is merged.
>
> Do folks think this warrants being cherrypicked into v2.24? My hunch is
> yes, cause basically one of the runners (local portable python runner) is
> broken for any production workload (works only if your pipeline has no
> dependencies).
>
> On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571
>>
>> However, I'm not up to date on the portable test infrastructure and would
>> appreciate guidance on what tests I can add for this.
>>
>> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> (FYI Sam +sbrother@gmail.com <sb...@gmail.com>)
>>>
>>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ek...@gmail.com>
>>> wrote:
>>>
>>>> Ok I found the bug, and now I don't understand how it could have
>>>> possibly ever worked. And if this was never tested, then I don't understand
>>>> why it works after fixing this one bug :)
>>>>
>>>> Basically the Python ArtifactStaging/RetrievalService uses
>>>> FileSystems.open() to read the artifacts to be staged, and
>>>> FileSystems.open() by default decompresses compressed files based on their
>>>> extension.
>>>> I found two of such services - in Python and in Java. Is the Python
>>>> used with an embedded job endpoint and the java one otherwise? I haven't
>>>> inspected the Java one, but fixing Python does the trick.
>>>>
>>>> The fix is this patch:
>>>>
>>>> diff --git
>>>> a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>> b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>> index f2bbf534c3..1f3ec1c0b0 100644
>>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>> @@ -41,6 +41,7 @@ import grpc
>>>>  from future.moves.urllib.request import urlopen
>>>>
>>>>  from apache_beam.io import filesystems
>>>> +from apache_beam.io.filesystems import CompressionTypes
>>>>  from apache_beam.portability import common_urns
>>>>  from apache_beam.portability.api import beam_artifact_api_pb2
>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>>>>      self._root = root
>>>>
>>>>    def file_reader(self, path):
>>>> -    return filesystems.FileSystems.open(path)
>>>> +    return filesystems.FileSystems.open(
>>>> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
>>>>
>>>>    def file_writer(self, name=None):
>>>>      full_path = filesystems.FileSystems.join(self._root, name)
>>>> diff --git
>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>> index 5bf3282250..2684235be0 100644
>>>> ---
>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>> +++
>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>> @@ -45,6 +45,7 @@ from typing import overload
>>>>  import grpc
>>>>
>>>>  from apache_beam.io import filesystems
>>>> +from apache_beam.io.filesystems import CompressionTypes
>>>>  from apache_beam.portability import common_urns
>>>>  from apache_beam.portability import python_urns
>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>>> @@ -464,9 +465,13 @@ class GrpcServer(object):
>>>>                  self.provision_info.provision_info, worker_manager),
>>>>              self.control_server)
>>>>
>>>> +      def open_uncompressed(f):
>>>> +        return filesystems.FileSystems.open(
>>>> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
>>>> +
>>>>
>>>>  beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
>>>>            artifact_service.ArtifactRetrievalService(
>>>> -              file_reader=filesystems.FileSystems.open),
>>>> +              file_reader=open_uncompressed),
>>>>            self.control_server)
>>>>
>>>>      self.data_plane_handler = data_plane.BeamFnDataServicer(
>>>>
>>>>
>>>>
>>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ek...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Maximilian,
>>>>>
>>>>> Thank you - it works fine with the embedded Flink runner (per below,
>>>>> seems like it's not using Docker for running Python code? What is it using
>>>>> then?).
>>>>>
>>>>> However, the original bug appears to be wider than I thought - it is
>>>>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems
>>>>> like something is very broken in local Docker execution in general - I
>>>>> haven't yet verified whether the same error will happen when running on a
>>>>> remote Flink cluster.
>>>>>
>>>>> Trying to build my own SDK containers with some more debugging so I
>>>>> can figure out what's going on...
>>>>>
>>>>>
>>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <mx...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Looks like you ran into a bug.
>>>>>>
>>>>>> You could just run your program without specifying any arguments,
>>>>>> since
>>>>>> running with Python's FnApiRunner should be enough.
>>>>>>
>>>>>> Alternatively, how about trying to run the same pipeline with the
>>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an
>>>>>> endpoint.
>>>>>> It will run the Python code embedded (loopback environment) without
>>>>>> additional containers.
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>>>>>> > Thanks Valentyn!
>>>>>> >
>>>>>> > Good to know that this is a bug (I'll file a bug), and that
>>>>>> Dataflow has
>>>>>> > an experimental way to use custom containers. I'll try that.
>>>>>> >
>>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>>>>>> > <valentyn@google.com <ma...@google.com>> wrote:
>>>>>> >
>>>>>> >     Hi Eugene,
>>>>>> >
>>>>>> >     Good to hear from you. The experience you are describing on
>>>>>> Portable
>>>>>> >     Runner + Docker container in local execution mode is most
>>>>>> certainly
>>>>>> >     a bug, if you have not opened an issue on it, please do so and
>>>>>> feel
>>>>>> >     free to cc me.
>>>>>> >
>>>>>> >     I can also reproduce the bug and likewise didn't see anything
>>>>>> >     obvious immediately, this needs some debugging.
>>>>>> >
>>>>>> >     cc: +Ankur Goenka <ma...@google.com> +Kyle Weaver
>>>>>> >     <ma...@google.com> who recently worked on Portable
>>>>>> Runner
>>>>>> >     and may be interested.
>>>>>> >
>>>>>> >     By the way, you should be able to use custom containers with
>>>>>> >     Dataflow, if you set --experiments=use_runner_v2.
>>>>>> >
>>>>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>>>>>> >     <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>>>>> >
>>>>>> >         (cc'ing Sam with whom I'm working on this atm)
>>>>>> >
>>>>>> >         FWIW I'm still stumped. I've looked through Python, Go and
>>>>>> Java
>>>>>> >         code in the Beam repo having anything to do with
>>>>>> >         gzipping/unzipping, and none of it appears to be used in the
>>>>>> >         artifact staging/retrieval codepaths. I also can't find any
>>>>>> >         mention of compression/decompression in the container boot
>>>>>> code.
>>>>>> >         My next step will be to add a bunch of debugging, rebuild
>>>>>> the
>>>>>> >         containers, and see what the artifact services think they're
>>>>>> >         serving.
>>>>>> >
>>>>>> >
>>>>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>>>>>> >         <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>>>>> >
>>>>>> >             Thanks Austin! Good stuff - though note that I am
>>>>>> >             /not/ using custom containers, I'm just trying to get
>>>>>> the
>>>>>> >             basic stuff to work, a Python pipeline with a simple
>>>>>> >             requirements.txt file. Feels like this should work
>>>>>> >             out-of-the-box, I must be doing something wrong.
>>>>>> >
>>>>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>>>>>> >             <whatwouldaustindo@gmail.com
>>>>>> >             <ma...@gmail.com>> wrote:
>>>>>> >
>>>>>> >                 I only believe @OrielResearch Eila Arich-Landkof
>>>>>> >                 <ma...@orielresearch.org> potentially doing
>>>>>> >                 applied work with custom containers (there must be
>>>>>> others)!
>>>>>> >
>>>>>> >                 For a plug for her and @BeamSummit --  I think
>>>>>> enough
>>>>>> >                 related will be talked about in (with Conda
>>>>>> specifics)
>>>>>> >                 -->
>>>>>> >
>>>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>>>>> >
>>>>>> >                 I'm sure others will have more things to say that
>>>>>> are
>>>>>> >                 actually helpful, on-list, before that occurs (~3
>>>>>> weeks).
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>>>>>> >                 <ekirpichov@gmail.com <ma...@gmail.com>>
>>>>>> wrote:
>>>>>> >
>>>>>> >                     Hi old Beam friends,
>>>>>> >
>>>>>> >                     I left Google to work on climate change
>>>>>> >                     <
>>>>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>>>>>> >
>>>>>> >                     and am now doing a short engagement with Pachama
>>>>>> >                     <https://pachama.com/>. Right now I'm trying
>>>>>> to get
>>>>>> >                     a Beam Python pipeline to work; the pipeline
>>>>>> will
>>>>>> >                     use fancy requirements and native dependencies,
>>>>>> and
>>>>>> >                     we plan to run it on Cloud Dataflow (so custom
>>>>>> >                     containers are not yet an option), so I'm going
>>>>>> >                     straight for the direct PortableRunner as per
>>>>>> >
>>>>>> https://beam.apache.org/documentation/runtime/environments/.
>>>>>> >
>>>>>> >                     Basically I can't get a minimal Beam program
>>>>>> with a
>>>>>> >                     minimal requirements.txt file to work - the
>>>>>> .tar.gz
>>>>>> >                     of the dependency mysteriously ends up being
>>>>>> >                     ungzipped and non-installable inside the Docker
>>>>>> >                     container running the worker. Details below.
>>>>>> >
>>>>>> >                     === main.py ===
>>>>>> >                     import argparse
>>>>>> >                     import logging
>>>>>> >
>>>>>> >                     import apache_beam as beam
>>>>>> >                     from apache_beam.options.pipeline_options import
>>>>>> >                     PipelineOptions
>>>>>> >                     from apache_beam.options.pipeline_options import
>>>>>> >                     SetupOptions
>>>>>> >
>>>>>> >                     def run(argv=None):
>>>>>> >                          parser = argparse.ArgumentParser()
>>>>>> >                          known_args, pipeline_args =
>>>>>> >                     parser.parse_known_args(argv)
>>>>>> >
>>>>>> >                          pipeline_options =
>>>>>> PipelineOptions(pipeline_args)
>>>>>> >
>>>>>> >
>>>>>>  pipeline_options.view_as(SetupOptions).save_main_session
>>>>>> >                     = True
>>>>>> >
>>>>>> >                          with
>>>>>> beam.Pipeline(options=pipeline_options) as p:
>>>>>> >                              (p | 'Create' >> beam.Create(['Hello'])
>>>>>> >                                 | 'Write' >>
>>>>>> beam.io.WriteToText('/tmp'))
>>>>>> >
>>>>>> >
>>>>>> >                     if __name__ == '__main__':
>>>>>> >                          logging.getLogger().setLevel(logging.INFO)
>>>>>> >                          run()
>>>>>> >
>>>>>> >                     === requirements.txt ===
>>>>>> >                     alembic
>>>>>> >
>>>>>> >                     When I run the program:
>>>>>> >                     $ python3 main.py
>>>>>> >
>>>>>>  --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>>>>>> >
>>>>>> >
>>>>>> >                     I get some normal output and then:
>>>>>> >
>>>>>> >
>>>>>>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>>>> >                       File
>>>>>> >
>>>>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>>> >                     line 261, in unpack_file\n
>>>>>>  untar_file(filename,
>>>>>> >                     location)\n  File
>>>>>> >
>>>>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>>> >                     line 177, in untar_file\n    tar =
>>>>>> >                     tarfile.open(filename, mode)\n  File
>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>>> 1591, in
>>>>>> >                     open\n    return func(name, filemode, fileobj,
>>>>>> >                     **kwargs)\n  File
>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>>> 1648, in
>>>>>> >                     gzopen\n    raise ReadError("not a gzip
>>>>>> >                     file")\ntarfile.ReadError: not a gzip
>>>>>> >                     file\n2020/08/08 01:17:07 Failed to install
>>>>>> required
>>>>>> >                     packages: failed to install requirements: exit
>>>>>> >                     status 2\n'
>>>>>> >
>>>>>> >                     This greatly puzzled me and, after some
>>>>>> looking, I
>>>>>> >                     found something really surprising. Here is the
>>>>>> >                     package in the /directory to be staged/:
>>>>>> >
>>>>>> >                     $ file
>>>>>> >
>>>>>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>>> >                     ...: gzip compressed data, was
>>>>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu
>>>>>> Mar 19
>>>>>> >                     21:48:31 2020, max compression, original size
>>>>>> modulo
>>>>>> >                     2^32 4730880
>>>>>> >                     $ ls -l
>>>>>> >
>>>>>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56
>>>>>> ...
>>>>>> >
>>>>>> >                     So far so good. But here is the same file
>>>>>> inside the
>>>>>> >                     Docker container (I ssh'd into the dead
>>>>>> container
>>>>>> >                     <
>>>>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>>>>>> >):
>>>>>> >
>>>>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar
>>>>>> archive
>>>>>> >                     (GNU)
>>>>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>>>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>>>>>> >
>>>>>> >                     The file has clearly been unzipped and now of
>>>>>> course
>>>>>> >                     pip can't install it! What's going on here? Am I
>>>>>> >                     using the direct/portable runner combination
>>>>>> wrong?
>>>>>> >
>>>>>> >                     Thanks!
>>>>>> >
>>>>>> >                     --
>>>>>> >                     Eugene Kirpichov
>>>>>> >                     http://www.linkedin.com/in/eugenekirpichov
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >             --
>>>>>> >             Eugene Kirpichov
>>>>>> >             http://www.linkedin.com/in/eugenekirpichov
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >         --
>>>>>> >         Eugene Kirpichov
>>>>>> >         http://www.linkedin.com/in/eugenekirpichov
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> > Eugene Kirpichov
>>>>>> > http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Eugene Kirpichov <ek...@gmail.com>.
The PR is merged.

Do folks think this warrants being cherrypicked into v2.24? My hunch is
yes, cause basically one of the runners (local portable python runner) is
broken for any production workload (works only if your pipeline has no
dependencies).

On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <ek...@gmail.com>
wrote:

> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571
>
> However, I'm not up to date on the portable test infrastructure and would
> appreciate guidance on what tests I can add for this.
>
> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> (FYI Sam +sbrother@gmail.com <sb...@gmail.com>)
>>
>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> Ok I found the bug, and now I don't understand how it could have
>>> possibly ever worked. And if this was never tested, then I don't understand
>>> why it works after fixing this one bug :)
>>>
>>> Basically the Python ArtifactStaging/RetrievalService uses
>>> FileSystems.open() to read the artifacts to be staged, and
>>> FileSystems.open() by default decompresses compressed files based on their
>>> extension.
>>> I found two of such services - in Python and in Java. Is the Python used
>>> with an embedded job endpoint and the java one otherwise? I haven't
>>> inspected the Java one, but fixing Python does the trick.
>>>
>>> The fix is this patch:
>>>
>>> diff --git
>>> a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>> b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>> index f2bbf534c3..1f3ec1c0b0 100644
>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>> @@ -41,6 +41,7 @@ import grpc
>>>  from future.moves.urllib.request import urlopen
>>>
>>>  from apache_beam.io import filesystems
>>> +from apache_beam.io.filesystems import CompressionTypes
>>>  from apache_beam.portability import common_urns
>>>  from apache_beam.portability.api import beam_artifact_api_pb2
>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>>>      self._root = root
>>>
>>>    def file_reader(self, path):
>>> -    return filesystems.FileSystems.open(path)
>>> +    return filesystems.FileSystems.open(
>>> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
>>>
>>>    def file_writer(self, name=None):
>>>      full_path = filesystems.FileSystems.join(self._root, name)
>>> diff --git
>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>> index 5bf3282250..2684235be0 100644
>>> ---
>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>> +++
>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>> @@ -45,6 +45,7 @@ from typing import overload
>>>  import grpc
>>>
>>>  from apache_beam.io import filesystems
>>> +from apache_beam.io.filesystems import CompressionTypes
>>>  from apache_beam.portability import common_urns
>>>  from apache_beam.portability import python_urns
>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>> @@ -464,9 +465,13 @@ class GrpcServer(object):
>>>                  self.provision_info.provision_info, worker_manager),
>>>              self.control_server)
>>>
>>> +      def open_uncompressed(f):
>>> +        return filesystems.FileSystems.open(
>>> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
>>> +
>>>
>>>  beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
>>>            artifact_service.ArtifactRetrievalService(
>>> -              file_reader=filesystems.FileSystems.open),
>>> +              file_reader=open_uncompressed),
>>>            self.control_server)
>>>
>>>      self.data_plane_handler = data_plane.BeamFnDataServicer(
>>>
>>>
>>>
>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ek...@gmail.com>
>>> wrote:
>>>
>>>> Hi Maximilian,
>>>>
>>>> Thank you - it works fine with the embedded Flink runner (per below,
>>>> seems like it's not using Docker for running Python code? What is it using
>>>> then?).
>>>>
>>>> However, the original bug appears to be wider than I thought - it is
>>>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems
>>>> like something is very broken in local Docker execution in general - I
>>>> haven't yet verified whether the same error will happen when running on a
>>>> remote Flink cluster.
>>>>
>>>> Trying to build my own SDK containers with some more debugging so I can
>>>> figure out what's going on...
>>>>
>>>>
>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <mx...@apache.org>
>>>> wrote:
>>>>
>>>>> Looks like you ran into a bug.
>>>>>
>>>>> You could just run your program without specifying any arguments,
>>>>> since
>>>>> running with Python's FnApiRunner should be enough.
>>>>>
>>>>> Alternatively, how about trying to run the same pipeline with the
>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint.
>>>>> It will run the Python code embedded (loopback environment) without
>>>>> additional containers.
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>>>>> > Thanks Valentyn!
>>>>> >
>>>>> > Good to know that this is a bug (I'll file a bug), and that Dataflow
>>>>> has
>>>>> > an experimental way to use custom containers. I'll try that.
>>>>> >
>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>>>>> > <valentyn@google.com <ma...@google.com>> wrote:
>>>>> >
>>>>> >     Hi Eugene,
>>>>> >
>>>>> >     Good to hear from you. The experience you are describing on
>>>>> Portable
>>>>> >     Runner + Docker container in local execution mode is most
>>>>> certainly
>>>>> >     a bug, if you have not opened an issue on it, please do so and
>>>>> feel
>>>>> >     free to cc me.
>>>>> >
>>>>> >     I can also reproduce the bug and likewise didn't see anything
>>>>> >     obvious immediately, this needs some debugging.
>>>>> >
>>>>> >     cc: +Ankur Goenka <ma...@google.com> +Kyle Weaver
>>>>> >     <ma...@google.com> who recently worked on Portable
>>>>> Runner
>>>>> >     and may be interested.
>>>>> >
>>>>> >     By the way, you should be able to use custom containers with
>>>>> >     Dataflow, if you set --experiments=use_runner_v2.
>>>>> >
>>>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>>>>> >     <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>>>> >
>>>>> >         (cc'ing Sam with whom I'm working on this atm)
>>>>> >
>>>>> >         FWIW I'm still stumped. I've looked through Python, Go and
>>>>> Java
>>>>> >         code in the Beam repo having anything to do with
>>>>> >         gzipping/unzipping, and none of it appears to be used in the
>>>>> >         artifact staging/retrieval codepaths. I also can't find any
>>>>> >         mention of compression/decompression in the container boot
>>>>> code.
>>>>> >         My next step will be to add a bunch of debugging, rebuild the
>>>>> >         containers, and see what the artifact services think they're
>>>>> >         serving.
>>>>> >
>>>>> >
>>>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>>>>> >         <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>>>> >
>>>>> >             Thanks Austin! Good stuff - though note that I am
>>>>> >             /not/ using custom containers, I'm just trying to get the
>>>>> >             basic stuff to work, a Python pipeline with a simple
>>>>> >             requirements.txt file. Feels like this should work
>>>>> >             out-of-the-box, I must be doing something wrong.
>>>>> >
>>>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>>>>> >             <whatwouldaustindo@gmail.com
>>>>> >             <ma...@gmail.com>> wrote:
>>>>> >
>>>>> >                 I only believe @OrielResearch Eila Arich-Landkof
>>>>> >                 <ma...@orielresearch.org> potentially doing
>>>>> >                 applied work with custom containers (there must be
>>>>> others)!
>>>>> >
>>>>> >                 For a plug for her and @BeamSummit --  I think enough
>>>>> >                 related will be talked about in (with Conda
>>>>> specifics)
>>>>> >                 -->
>>>>> >
>>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>>>> >
>>>>> >                 I'm sure others will have more things to say that are
>>>>> >                 actually helpful, on-list, before that occurs (~3
>>>>> weeks).
>>>>> >
>>>>> >
>>>>> >
>>>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>>>>> >                 <ekirpichov@gmail.com <ma...@gmail.com>>
>>>>> wrote:
>>>>> >
>>>>> >                     Hi old Beam friends,
>>>>> >
>>>>> >                     I left Google to work on climate change
>>>>> >                     <
>>>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>>>>> >
>>>>> >                     and am now doing a short engagement with Pachama
>>>>> >                     <https://pachama.com/>. Right now I'm trying to
>>>>> get
>>>>> >                     a Beam Python pipeline to work; the pipeline will
>>>>> >                     use fancy requirements and native dependencies,
>>>>> and
>>>>> >                     we plan to run it on Cloud Dataflow (so custom
>>>>> >                     containers are not yet an option), so I'm going
>>>>> >                     straight for the direct PortableRunner as per
>>>>> >
>>>>> https://beam.apache.org/documentation/runtime/environments/.
>>>>> >
>>>>> >                     Basically I can't get a minimal Beam program
>>>>> with a
>>>>> >                     minimal requirements.txt file to work - the
>>>>> .tar.gz
>>>>> >                     of the dependency mysteriously ends up being
>>>>> >                     ungzipped and non-installable inside the Docker
>>>>> >                     container running the worker. Details below.
>>>>> >
>>>>> >                     === main.py ===
>>>>> >                     import argparse
>>>>> >                     import logging
>>>>> >
>>>>> >                     import apache_beam as beam
>>>>> >                     from apache_beam.options.pipeline_options import
>>>>> >                     PipelineOptions
>>>>> >                     from apache_beam.options.pipeline_options import
>>>>> >                     SetupOptions
>>>>> >
>>>>> >                     def run(argv=None):
>>>>> >                          parser = argparse.ArgumentParser()
>>>>> >                          known_args, pipeline_args =
>>>>> >                     parser.parse_known_args(argv)
>>>>> >
>>>>> >                          pipeline_options =
>>>>> PipelineOptions(pipeline_args)
>>>>> >
>>>>> >
>>>>>  pipeline_options.view_as(SetupOptions).save_main_session
>>>>> >                     = True
>>>>> >
>>>>> >                          with
>>>>> beam.Pipeline(options=pipeline_options) as p:
>>>>> >                              (p | 'Create' >> beam.Create(['Hello'])
>>>>> >                                 | 'Write' >>
>>>>> beam.io.WriteToText('/tmp'))
>>>>> >
>>>>> >
>>>>> >                     if __name__ == '__main__':
>>>>> >                          logging.getLogger().setLevel(logging.INFO)
>>>>> >                          run()
>>>>> >
>>>>> >                     === requirements.txt ===
>>>>> >                     alembic
>>>>> >
>>>>> >                     When I run the program:
>>>>> >                     $ python3 main.py
>>>>> >
>>>>>  --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>>>>> >
>>>>> >
>>>>> >                     I get some normal output and then:
>>>>> >
>>>>> >
>>>>>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>>> >                       File
>>>>> >
>>>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>> >                     line 261, in unpack_file\n
>>>>>  untar_file(filename,
>>>>> >                     location)\n  File
>>>>> >
>>>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>> >                     line 177, in untar_file\n    tar =
>>>>> >                     tarfile.open(filename, mode)\n  File
>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>> 1591, in
>>>>> >                     open\n    return func(name, filemode, fileobj,
>>>>> >                     **kwargs)\n  File
>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>> 1648, in
>>>>> >                     gzopen\n    raise ReadError("not a gzip
>>>>> >                     file")\ntarfile.ReadError: not a gzip
>>>>> >                     file\n2020/08/08 01:17:07 Failed to install
>>>>> required
>>>>> >                     packages: failed to install requirements: exit
>>>>> >                     status 2\n'
>>>>> >
>>>>> >                     This greatly puzzled me and, after some looking,
>>>>> I
>>>>> >                     found something really surprising. Here is the
>>>>> >                     package in the /directory to be staged/:
>>>>> >
>>>>> >                     $ file
>>>>> >
>>>>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>> >                     ...: gzip compressed data, was
>>>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu Mar
>>>>> 19
>>>>> >                     21:48:31 2020, max compression, original size
>>>>> modulo
>>>>> >                     2^32 4730880
>>>>> >                     $ ls -l
>>>>> >
>>>>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56
>>>>> ...
>>>>> >
>>>>> >                     So far so good. But here is the same file inside
>>>>> the
>>>>> >                     Docker container (I ssh'd into the dead container
>>>>> >                     <
>>>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>>>>> >):
>>>>> >
>>>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar
>>>>> archive
>>>>> >                     (GNU)
>>>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>>>>> >
>>>>> >                     The file has clearly been unzipped and now of
>>>>> course
>>>>> >                     pip can't install it! What's going on here? Am I
>>>>> >                     using the direct/portable runner combination
>>>>> wrong?
>>>>> >
>>>>> >                     Thanks!
>>>>> >
>>>>> >                     --
>>>>> >                     Eugene Kirpichov
>>>>> >                     http://www.linkedin.com/in/eugenekirpichov
>>>>> >
>>>>> >
>>>>> >
>>>>> >             --
>>>>> >             Eugene Kirpichov
>>>>> >             http://www.linkedin.com/in/eugenekirpichov
>>>>> >
>>>>> >
>>>>> >
>>>>> >         --
>>>>> >         Eugene Kirpichov
>>>>> >         http://www.linkedin.com/in/eugenekirpichov
>>>>> >
>>>>> >
>>>>> >
>>>>> > --
>>>>> > Eugene Kirpichov
>>>>> > http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Eugene Kirpichov <ek...@gmail.com>.
FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571

However, I'm not up to date on the portable test infrastructure and would
appreciate guidance on what tests I can add for this.

On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ek...@gmail.com>
wrote:

> (FYI Sam +sbrother@gmail.com <sb...@gmail.com>)
>
> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> Ok I found the bug, and now I don't understand how it could have possibly
>> ever worked. And if this was never tested, then I don't understand why it
>> works after fixing this one bug :)
>>
>> Basically the Python ArtifactStaging/RetrievalService uses
>> FileSystems.open() to read the artifacts to be staged, and
>> FileSystems.open() by default decompresses compressed files based on their
>> extension.
>> I found two of such services - in Python and in Java. Is the Python used
>> with an embedded job endpoint and the java one otherwise? I haven't
>> inspected the Java one, but fixing Python does the trick.
>>
>> The fix is this patch:
>>
>> diff --git
>> a/sdks/python/apache_beam/runners/portability/artifact_service.py
>> b/sdks/python/apache_beam/runners/portability/artifact_service.py
>> index f2bbf534c3..1f3ec1c0b0 100644
>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
>> @@ -41,6 +41,7 @@ import grpc
>>  from future.moves.urllib.request import urlopen
>>
>>  from apache_beam.io import filesystems
>> +from apache_beam.io.filesystems import CompressionTypes
>>  from apache_beam.portability import common_urns
>>  from apache_beam.portability.api import beam_artifact_api_pb2
>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>>      self._root = root
>>
>>    def file_reader(self, path):
>> -    return filesystems.FileSystems.open(path)
>> +    return filesystems.FileSystems.open(
>> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
>>
>>    def file_writer(self, name=None):
>>      full_path = filesystems.FileSystems.join(self._root, name)
>> diff --git
>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>> index 5bf3282250..2684235be0 100644
>> ---
>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>> +++
>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>> @@ -45,6 +45,7 @@ from typing import overload
>>  import grpc
>>
>>  from apache_beam.io import filesystems
>> +from apache_beam.io.filesystems import CompressionTypes
>>  from apache_beam.portability import common_urns
>>  from apache_beam.portability import python_urns
>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>> @@ -464,9 +465,13 @@ class GrpcServer(object):
>>                  self.provision_info.provision_info, worker_manager),
>>              self.control_server)
>>
>> +      def open_uncompressed(f):
>> +        return filesystems.FileSystems.open(
>> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
>> +
>>
>>  beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
>>            artifact_service.ArtifactRetrievalService(
>> -              file_reader=filesystems.FileSystems.open),
>> +              file_reader=open_uncompressed),
>>            self.control_server)
>>
>>      self.data_plane_handler = data_plane.BeamFnDataServicer(
>>
>>
>>
>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> Hi Maximilian,
>>>
>>> Thank you - it works fine with the embedded Flink runner (per below,
>>> seems like it's not using Docker for running Python code? What is it using
>>> then?).
>>>
>>> However, the original bug appears to be wider than I thought - it is
>>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems
>>> like something is very broken in local Docker execution in general - I
>>> haven't yet verified whether the same error will happen when running on a
>>> remote Flink cluster.
>>>
>>> Trying to build my own SDK containers with some more debugging so I can
>>> figure out what's going on...
>>>
>>>
>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>
>>>> Looks like you ran into a bug.
>>>>
>>>> You could just run your program without specifying any arguments, since
>>>> running with Python's FnApiRunner should be enough.
>>>>
>>>> Alternatively, how about trying to run the same pipeline with the
>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint.
>>>> It will run the Python code embedded (loopback environment) without
>>>> additional containers.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>>>> > Thanks Valentyn!
>>>> >
>>>> > Good to know that this is a bug (I'll file a bug), and that Dataflow
>>>> has
>>>> > an experimental way to use custom containers. I'll try that.
>>>> >
>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>>>> > <valentyn@google.com <ma...@google.com>> wrote:
>>>> >
>>>> >     Hi Eugene,
>>>> >
>>>> >     Good to hear from you. The experience you are describing on
>>>> Portable
>>>> >     Runner + Docker container in local execution mode is most
>>>> certainly
>>>> >     a bug, if you have not opened an issue on it, please do so and
>>>> feel
>>>> >     free to cc me.
>>>> >
>>>> >     I can also reproduce the bug and likewise didn't see anything
>>>> >     obvious immediately, this needs some debugging.
>>>> >
>>>> >     cc: +Ankur Goenka <ma...@google.com> +Kyle Weaver
>>>> >     <ma...@google.com> who recently worked on Portable
>>>> Runner
>>>> >     and may be interested.
>>>> >
>>>> >     By the way, you should be able to use custom containers with
>>>> >     Dataflow, if you set --experiments=use_runner_v2.
>>>> >
>>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>>>> >     <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>>> >
>>>> >         (cc'ing Sam with whom I'm working on this atm)
>>>> >
>>>> >         FWIW I'm still stumped. I've looked through Python, Go and
>>>> Java
>>>> >         code in the Beam repo having anything to do with
>>>> >         gzipping/unzipping, and none of it appears to be used in the
>>>> >         artifact staging/retrieval codepaths. I also can't find any
>>>> >         mention of compression/decompression in the container boot
>>>> code.
>>>> >         My next step will be to add a bunch of debugging, rebuild the
>>>> >         containers, and see what the artifact services think they're
>>>> >         serving.
>>>> >
>>>> >
>>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>>>> >         <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>>> >
>>>> >             Thanks Austin! Good stuff - though note that I am
>>>> >             /not/ using custom containers, I'm just trying to get the
>>>> >             basic stuff to work, a Python pipeline with a simple
>>>> >             requirements.txt file. Feels like this should work
>>>> >             out-of-the-box, I must be doing something wrong.
>>>> >
>>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>>>> >             <whatwouldaustindo@gmail.com
>>>> >             <ma...@gmail.com>> wrote:
>>>> >
>>>> >                 I only believe @OrielResearch Eila Arich-Landkof
>>>> >                 <ma...@orielresearch.org> potentially doing
>>>> >                 applied work with custom containers (there must be
>>>> others)!
>>>> >
>>>> >                 For a plug for her and @BeamSummit --  I think enough
>>>> >                 related will be talked about in (with Conda specifics)
>>>> >                 -->
>>>> >
>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>>> >
>>>> >                 I'm sure others will have more things to say that are
>>>> >                 actually helpful, on-list, before that occurs (~3
>>>> weeks).
>>>> >
>>>> >
>>>> >
>>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>>>> >                 <ekirpichov@gmail.com <ma...@gmail.com>>
>>>> wrote:
>>>> >
>>>> >                     Hi old Beam friends,
>>>> >
>>>> >                     I left Google to work on climate change
>>>> >                     <
>>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>>>> >
>>>> >                     and am now doing a short engagement with Pachama
>>>> >                     <https://pachama.com/>. Right now I'm trying to
>>>> get
>>>> >                     a Beam Python pipeline to work; the pipeline will
>>>> >                     use fancy requirements and native dependencies,
>>>> and
>>>> >                     we plan to run it on Cloud Dataflow (so custom
>>>> >                     containers are not yet an option), so I'm going
>>>> >                     straight for the direct PortableRunner as per
>>>> >
>>>> https://beam.apache.org/documentation/runtime/environments/.
>>>> >
>>>> >                     Basically I can't get a minimal Beam program with
>>>> a
>>>> >                     minimal requirements.txt file to work - the
>>>> .tar.gz
>>>> >                     of the dependency mysteriously ends up being
>>>> >                     ungzipped and non-installable inside the Docker
>>>> >                     container running the worker. Details below.
>>>> >
>>>> >                     === main.py ===
>>>> >                     import argparse
>>>> >                     import logging
>>>> >
>>>> >                     import apache_beam as beam
>>>> >                     from apache_beam.options.pipeline_options import
>>>> >                     PipelineOptions
>>>> >                     from apache_beam.options.pipeline_options import
>>>> >                     SetupOptions
>>>> >
>>>> >                     def run(argv=None):
>>>> >                          parser = argparse.ArgumentParser()
>>>> >                          known_args, pipeline_args =
>>>> >                     parser.parse_known_args(argv)
>>>> >
>>>> >                          pipeline_options =
>>>> PipelineOptions(pipeline_args)
>>>> >
>>>> >
>>>>  pipeline_options.view_as(SetupOptions).save_main_session
>>>> >                     = True
>>>> >
>>>> >                          with beam.Pipeline(options=pipeline_options)
>>>> as p:
>>>> >                              (p | 'Create' >> beam.Create(['Hello'])
>>>> >                                 | 'Write' >>
>>>> beam.io.WriteToText('/tmp'))
>>>> >
>>>> >
>>>> >                     if __name__ == '__main__':
>>>> >                          logging.getLogger().setLevel(logging.INFO)
>>>> >                          run()
>>>> >
>>>> >                     === requirements.txt ===
>>>> >                     alembic
>>>> >
>>>> >                     When I run the program:
>>>> >                     $ python3 main.py
>>>> >
>>>>  --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>>>> >
>>>> >
>>>> >                     I get some normal output and then:
>>>> >
>>>> >
>>>>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>> >                       File
>>>> >
>>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>> >                     line 261, in unpack_file\n    untar_file(filename,
>>>> >                     location)\n  File
>>>> >
>>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>> >                     line 177, in untar_file\n    tar =
>>>> >                     tarfile.open(filename, mode)\n  File
>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1591,
>>>> in
>>>> >                     open\n    return func(name, filemode, fileobj,
>>>> >                     **kwargs)\n  File
>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1648,
>>>> in
>>>> >                     gzopen\n    raise ReadError("not a gzip
>>>> >                     file")\ntarfile.ReadError: not a gzip
>>>> >                     file\n2020/08/08 01:17:07 Failed to install
>>>> required
>>>> >                     packages: failed to install requirements: exit
>>>> >                     status 2\n'
>>>> >
>>>> >                     This greatly puzzled me and, after some looking, I
>>>> >                     found something really surprising. Here is the
>>>> >                     package in the /directory to be staged/:
>>>> >
>>>> >                     $ file
>>>> >
>>>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>> >                     ...: gzip compressed data, was
>>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu Mar
>>>> 19
>>>> >                     21:48:31 2020, max compression, original size
>>>> modulo
>>>> >                     2^32 4730880
>>>> >                     $ ls -l
>>>> >
>>>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56
>>>> ...
>>>> >
>>>> >                     So far so good. But here is the same file inside
>>>> the
>>>> >                     Docker container (I ssh'd into the dead container
>>>> >                     <
>>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>>>> >):
>>>> >
>>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar
>>>> archive
>>>> >                     (GNU)
>>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>>>> >
>>>> >                     The file has clearly been unzipped and now of
>>>> course
>>>> >                     pip can't install it! What's going on here? Am I
>>>> >                     using the direct/portable runner combination
>>>> wrong?
>>>> >
>>>> >                     Thanks!
>>>> >
>>>> >                     --
>>>> >                     Eugene Kirpichov
>>>> >                     http://www.linkedin.com/in/eugenekirpichov
>>>> >
>>>> >
>>>> >
>>>> >             --
>>>> >             Eugene Kirpichov
>>>> >             http://www.linkedin.com/in/eugenekirpichov
>>>> >
>>>> >
>>>> >
>>>> >         --
>>>> >         Eugene Kirpichov
>>>> >         http://www.linkedin.com/in/eugenekirpichov
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > Eugene Kirpichov
>>>> > http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Eugene Kirpichov <ek...@gmail.com>.
(FYI Sam +sbrother@gmail.com <sb...@gmail.com>)

On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ek...@gmail.com>
wrote:

> Ok I found the bug, and now I don't understand how it could have possibly
> ever worked. And if this was never tested, then I don't understand why it
> works after fixing this one bug :)
>
> Basically the Python ArtifactStaging/RetrievalService uses
> FileSystems.open() to read the artifacts to be staged, and
> FileSystems.open() by default decompresses compressed files based on their
> extension.
> I found two of such services - in Python and in Java. Is the Python used
> with an embedded job endpoint and the java one otherwise? I haven't
> inspected the Java one, but fixing Python does the trick.
>
> The fix is this patch:
>
> diff --git
> a/sdks/python/apache_beam/runners/portability/artifact_service.py
> b/sdks/python/apache_beam/runners/portability/artifact_service.py
> index f2bbf534c3..1f3ec1c0b0 100644
> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
> @@ -41,6 +41,7 @@ import grpc
>  from future.moves.urllib.request import urlopen
>
>  from apache_beam.io import filesystems
> +from apache_beam.io.filesystems import CompressionTypes
>  from apache_beam.portability import common_urns
>  from apache_beam.portability.api import beam_artifact_api_pb2
>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>      self._root = root
>
>    def file_reader(self, path):
> -    return filesystems.FileSystems.open(path)
> +    return filesystems.FileSystems.open(
> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
>
>    def file_writer(self, name=None):
>      full_path = filesystems.FileSystems.join(self._root, name)
> diff --git
> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> index 5bf3282250..2684235be0 100644
> ---
> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> +++
> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> @@ -45,6 +45,7 @@ from typing import overload
>  import grpc
>
>  from apache_beam.io import filesystems
> +from apache_beam.io.filesystems import CompressionTypes
>  from apache_beam.portability import common_urns
>  from apache_beam.portability import python_urns
>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
> @@ -464,9 +465,13 @@ class GrpcServer(object):
>                  self.provision_info.provision_info, worker_manager),
>              self.control_server)
>
> +      def open_uncompressed(f):
> +        return filesystems.FileSystems.open(
> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
> +
>
>  beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
>            artifact_service.ArtifactRetrievalService(
> -              file_reader=filesystems.FileSystems.open),
> +              file_reader=open_uncompressed),
>            self.control_server)
>
>      self.data_plane_handler = data_plane.BeamFnDataServicer(
>
>
>
> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> Hi Maximilian,
>>
>> Thank you - it works fine with the embedded Flink runner (per below,
>> seems like it's not using Docker for running Python code? What is it using
>> then?).
>>
>> However, the original bug appears to be wider than I thought - it is also
>> present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems like
>> something is very broken in local Docker execution in general - I haven't
>> yet verified whether the same error will happen when running on a remote
>> Flink cluster.
>>
>> Trying to build my own SDK containers with some more debugging so I can
>> figure out what's going on...
>>
>>
>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Looks like you ran into a bug.
>>>
>>> You could just run your program without specifying any arguments, since
>>> running with Python's FnApiRunner should be enough.
>>>
>>> Alternatively, how about trying to run the same pipeline with the
>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint.
>>> It will run the Python code embedded (loopback environment) without
>>> additional containers.
>>>
>>> Cheers,
>>> Max
>>>
>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>>> > Thanks Valentyn!
>>> >
>>> > Good to know that this is a bug (I'll file a bug), and that Dataflow
>>> has
>>> > an experimental way to use custom containers. I'll try that.
>>> >
>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>>> > <valentyn@google.com <ma...@google.com>> wrote:
>>> >
>>> >     Hi Eugene,
>>> >
>>> >     Good to hear from you. The experience you are describing on
>>> Portable
>>> >     Runner + Docker container in local execution mode is most certainly
>>> >     a bug, if you have not opened an issue on it, please do so and feel
>>> >     free to cc me.
>>> >
>>> >     I can also reproduce the bug and likewise didn't see anything
>>> >     obvious immediately, this needs some debugging.
>>> >
>>> >     cc: +Ankur Goenka <ma...@google.com> +Kyle Weaver
>>> >     <ma...@google.com> who recently worked on Portable
>>> Runner
>>> >     and may be interested.
>>> >
>>> >     By the way, you should be able to use custom containers with
>>> >     Dataflow, if you set --experiments=use_runner_v2.
>>> >
>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>>> >     <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>> >
>>> >         (cc'ing Sam with whom I'm working on this atm)
>>> >
>>> >         FWIW I'm still stumped. I've looked through Python, Go and Java
>>> >         code in the Beam repo having anything to do with
>>> >         gzipping/unzipping, and none of it appears to be used in the
>>> >         artifact staging/retrieval codepaths. I also can't find any
>>> >         mention of compression/decompression in the container boot
>>> code.
>>> >         My next step will be to add a bunch of debugging, rebuild the
>>> >         containers, and see what the artifact services think they're
>>> >         serving.
>>> >
>>> >
>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>>> >         <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>>> >
>>> >             Thanks Austin! Good stuff - though note that I am
>>> >             /not/ using custom containers, I'm just trying to get the
>>> >             basic stuff to work, a Python pipeline with a simple
>>> >             requirements.txt file. Feels like this should work
>>> >             out-of-the-box, I must be doing something wrong.
>>> >
>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>>> >             <whatwouldaustindo@gmail.com
>>> >             <ma...@gmail.com>> wrote:
>>> >
>>> >                 I only believe @OrielResearch Eila Arich-Landkof
>>> >                 <ma...@orielresearch.org> potentially doing
>>> >                 applied work with custom containers (there must be
>>> others)!
>>> >
>>> >                 For a plug for her and @BeamSummit --  I think enough
>>> >                 related will be talked about in (with Conda specifics)
>>> >                 -->
>>> >
>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>> >
>>> >                 I'm sure others will have more things to say that are
>>> >                 actually helpful, on-list, before that occurs (~3
>>> weeks).
>>> >
>>> >
>>> >
>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>>> >                 <ekirpichov@gmail.com <ma...@gmail.com>>
>>> wrote:
>>> >
>>> >                     Hi old Beam friends,
>>> >
>>> >                     I left Google to work on climate change
>>> >                     <
>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>>> >
>>> >                     and am now doing a short engagement with Pachama
>>> >                     <https://pachama.com/>. Right now I'm trying to
>>> get
>>> >                     a Beam Python pipeline to work; the pipeline will
>>> >                     use fancy requirements and native dependencies, and
>>> >                     we plan to run it on Cloud Dataflow (so custom
>>> >                     containers are not yet an option), so I'm going
>>> >                     straight for the direct PortableRunner as per
>>> >
>>> https://beam.apache.org/documentation/runtime/environments/.
>>> >
>>> >                     Basically I can't get a minimal Beam program with a
>>> >                     minimal requirements.txt file to work - the .tar.gz
>>> >                     of the dependency mysteriously ends up being
>>> >                     ungzipped and non-installable inside the Docker
>>> >                     container running the worker. Details below.
>>> >
>>> >                     === main.py ===
>>> >                     import argparse
>>> >                     import logging
>>> >
>>> >                     import apache_beam as beam
>>> >                     from apache_beam.options.pipeline_options import
>>> >                     PipelineOptions
>>> >                     from apache_beam.options.pipeline_options import
>>> >                     SetupOptions
>>> >
>>> >                     def run(argv=None):
>>> >                          parser = argparse.ArgumentParser()
>>> >                          known_args, pipeline_args =
>>> >                     parser.parse_known_args(argv)
>>> >
>>> >                          pipeline_options =
>>> PipelineOptions(pipeline_args)
>>> >
>>> >
>>>  pipeline_options.view_as(SetupOptions).save_main_session
>>> >                     = True
>>> >
>>> >                          with beam.Pipeline(options=pipeline_options)
>>> as p:
>>> >                              (p | 'Create' >> beam.Create(['Hello'])
>>> >                                 | 'Write' >>
>>> beam.io.WriteToText('/tmp'))
>>> >
>>> >
>>> >                     if __name__ == '__main__':
>>> >                          logging.getLogger().setLevel(logging.INFO)
>>> >                          run()
>>> >
>>> >                     === requirements.txt ===
>>> >                     alembic
>>> >
>>> >                     When I run the program:
>>> >                     $ python3 main.py
>>> >
>>>  --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>>> >
>>> >
>>> >                     I get some normal output and then:
>>> >
>>> >
>>>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>> >                       File
>>> >
>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>> >                     line 261, in unpack_file\n    untar_file(filename,
>>> >                     location)\n  File
>>> >
>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>> >                     line 177, in untar_file\n    tar =
>>> >                     tarfile.open(filename, mode)\n  File
>>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1591,
>>> in
>>> >                     open\n    return func(name, filemode, fileobj,
>>> >                     **kwargs)\n  File
>>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1648,
>>> in
>>> >                     gzopen\n    raise ReadError("not a gzip
>>> >                     file")\ntarfile.ReadError: not a gzip
>>> >                     file\n2020/08/08 01:17:07 Failed to install
>>> required
>>> >                     packages: failed to install requirements: exit
>>> >                     status 2\n'
>>> >
>>> >                     This greatly puzzled me and, after some looking, I
>>> >                     found something really surprising. Here is the
>>> >                     package in the /directory to be staged/:
>>> >
>>> >                     $ file
>>> >
>>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>> >                     ...: gzip compressed data, was
>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu Mar 19
>>> >                     21:48:31 2020, max compression, original size
>>> modulo
>>> >                     2^32 4730880
>>> >                     $ ls -l
>>> >
>>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...
>>> >
>>> >                     So far so good. But here is the same file inside
>>> the
>>> >                     Docker container (I ssh'd into the dead container
>>> >                     <
>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>>> >):
>>> >
>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive
>>> >                     (GNU)
>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>>> >
>>> >                     The file has clearly been unzipped and now of
>>> course
>>> >                     pip can't install it! What's going on here? Am I
>>> >                     using the direct/portable runner combination wrong?
>>> >
>>> >                     Thanks!
>>> >
>>> >                     --
>>> >                     Eugene Kirpichov
>>> >                     http://www.linkedin.com/in/eugenekirpichov
>>> >
>>> >
>>> >
>>> >             --
>>> >             Eugene Kirpichov
>>> >             http://www.linkedin.com/in/eugenekirpichov
>>> >
>>> >
>>> >
>>> >         --
>>> >         Eugene Kirpichov
>>> >         http://www.linkedin.com/in/eugenekirpichov
>>> >
>>> >
>>> >
>>> > --
>>> > Eugene Kirpichov
>>> > http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Eugene Kirpichov <ek...@gmail.com>.
Ok I found the bug, and now I don't understand how it could have possibly
ever worked. And if this was never tested, then I don't understand why it
works after fixing this one bug :)

Basically the Python ArtifactStaging/RetrievalService uses
FileSystems.open() to read the artifacts to be staged, and
FileSystems.open() by default decompresses compressed files based on their
extension.
I found two of such services - in Python and in Java. Is the Python used
with an embedded job endpoint and the java one otherwise? I haven't
inspected the Java one, but fixing Python does the trick.

The fix is this patch:

diff --git
a/sdks/python/apache_beam/runners/portability/artifact_service.py
b/sdks/python/apache_beam/runners/portability/artifact_service.py
index f2bbf534c3..1f3ec1c0b0 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -41,6 +41,7 @@ import grpc
 from future.moves.urllib.request import urlopen

 from apache_beam.io import filesystems
+from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_artifact_api_pb2
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
@@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
     self._root = root

   def file_reader(self, path):
-    return filesystems.FileSystems.open(path)
+    return filesystems.FileSystems.open(
+        path, compression_type=CompressionTypes.UNCOMPRESSED)

   def file_writer(self, name=None):
     full_path = filesystems.FileSystems.join(self._root, name)
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index 5bf3282250..2684235be0 100644
---
a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++
b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -45,6 +45,7 @@ from typing import overload
 import grpc

 from apache_beam.io import filesystems
+from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
@@ -464,9 +465,13 @@ class GrpcServer(object):
                 self.provision_info.provision_info, worker_manager),
             self.control_server)

+      def open_uncompressed(f):
+        return filesystems.FileSystems.open(
+            f, compression_type=CompressionTypes.UNCOMPRESSED)
+

 beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
           artifact_service.ArtifactRetrievalService(
-              file_reader=filesystems.FileSystems.open),
+              file_reader=open_uncompressed),
           self.control_server)

     self.data_plane_handler = data_plane.BeamFnDataServicer(



On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ek...@gmail.com>
wrote:

> Hi Maximilian,
>
> Thank you - it works fine with the embedded Flink runner (per below, seems
> like it's not using Docker for running Python code? What is it using then?).
>
> However, the original bug appears to be wider than I thought - it is also
> present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems like
> something is very broken in local Docker execution in general - I haven't
> yet verified whether the same error will happen when running on a remote
> Flink cluster.
>
> Trying to build my own SDK containers with some more debugging so I can
> figure out what's going on...
>
>
> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> Looks like you ran into a bug.
>>
>> You could just run your program without specifying any arguments, since
>> running with Python's FnApiRunner should be enough.
>>
>> Alternatively, how about trying to run the same pipeline with the
>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint.
>> It will run the Python code embedded (loopback environment) without
>> additional containers.
>>
>> Cheers,
>> Max
>>
>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>> > Thanks Valentyn!
>> >
>> > Good to know that this is a bug (I'll file a bug), and that Dataflow
>> has
>> > an experimental way to use custom containers. I'll try that.
>> >
>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>> > <valentyn@google.com <ma...@google.com>> wrote:
>> >
>> >     Hi Eugene,
>> >
>> >     Good to hear from you. The experience you are describing on Portable
>> >     Runner + Docker container in local execution mode is most certainly
>> >     a bug, if you have not opened an issue on it, please do so and feel
>> >     free to cc me.
>> >
>> >     I can also reproduce the bug and likewise didn't see anything
>> >     obvious immediately, this needs some debugging.
>> >
>> >     cc: +Ankur Goenka <ma...@google.com> +Kyle Weaver
>> >     <ma...@google.com> who recently worked on Portable Runner
>> >     and may be interested.
>> >
>> >     By the way, you should be able to use custom containers with
>> >     Dataflow, if you set --experiments=use_runner_v2.
>> >
>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>> >     <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>> >
>> >         (cc'ing Sam with whom I'm working on this atm)
>> >
>> >         FWIW I'm still stumped. I've looked through Python, Go and Java
>> >         code in the Beam repo having anything to do with
>> >         gzipping/unzipping, and none of it appears to be used in the
>> >         artifact staging/retrieval codepaths. I also can't find any
>> >         mention of compression/decompression in the container boot code.
>> >         My next step will be to add a bunch of debugging, rebuild the
>> >         containers, and see what the artifact services think they're
>> >         serving.
>> >
>> >
>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>> >         <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
>> >
>> >             Thanks Austin! Good stuff - though note that I am
>> >             /not/ using custom containers, I'm just trying to get the
>> >             basic stuff to work, a Python pipeline with a simple
>> >             requirements.txt file. Feels like this should work
>> >             out-of-the-box, I must be doing something wrong.
>> >
>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>> >             <whatwouldaustindo@gmail.com
>> >             <ma...@gmail.com>> wrote:
>> >
>> >                 I only believe @OrielResearch Eila Arich-Landkof
>> >                 <ma...@orielresearch.org> potentially doing
>> >                 applied work with custom containers (there must be
>> others)!
>> >
>> >                 For a plug for her and @BeamSummit --  I think enough
>> >                 related will be talked about in (with Conda specifics)
>> >                 -->
>> >
>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>> >
>> >                 I'm sure others will have more things to say that are
>> >                 actually helpful, on-list, before that occurs (~3
>> weeks).
>> >
>> >
>> >
>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>> >                 <ekirpichov@gmail.com <ma...@gmail.com>>
>> wrote:
>> >
>> >                     Hi old Beam friends,
>> >
>> >                     I left Google to work on climate change
>> >                     <
>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>> >
>> >                     and am now doing a short engagement with Pachama
>> >                     <https://pachama.com/>. Right now I'm trying to get
>> >                     a Beam Python pipeline to work; the pipeline will
>> >                     use fancy requirements and native dependencies, and
>> >                     we plan to run it on Cloud Dataflow (so custom
>> >                     containers are not yet an option), so I'm going
>> >                     straight for the direct PortableRunner as per
>> >
>> https://beam.apache.org/documentation/runtime/environments/.
>> >
>> >                     Basically I can't get a minimal Beam program with a
>> >                     minimal requirements.txt file to work - the .tar.gz
>> >                     of the dependency mysteriously ends up being
>> >                     ungzipped and non-installable inside the Docker
>> >                     container running the worker. Details below.
>> >
>> >                     === main.py ===
>> >                     import argparse
>> >                     import logging
>> >
>> >                     import apache_beam as beam
>> >                     from apache_beam.options.pipeline_options import
>> >                     PipelineOptions
>> >                     from apache_beam.options.pipeline_options import
>> >                     SetupOptions
>> >
>> >                     def run(argv=None):
>> >                          parser = argparse.ArgumentParser()
>> >                          known_args, pipeline_args =
>> >                     parser.parse_known_args(argv)
>> >
>> >                          pipeline_options =
>> PipelineOptions(pipeline_args)
>> >
>> >
>>  pipeline_options.view_as(SetupOptions).save_main_session
>> >                     = True
>> >
>> >                          with beam.Pipeline(options=pipeline_options)
>> as p:
>> >                              (p | 'Create' >> beam.Create(['Hello'])
>> >                                 | 'Write' >>
>> beam.io.WriteToText('/tmp'))
>> >
>> >
>> >                     if __name__ == '__main__':
>> >                          logging.getLogger().setLevel(logging.INFO)
>> >                          run()
>> >
>> >                     === requirements.txt ===
>> >                     alembic
>> >
>> >                     When I run the program:
>> >                     $ python3 main.py
>> >
>>  --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>> >
>> >
>> >                     I get some normal output and then:
>> >
>> >
>>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>> >                       File
>> >
>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>> >                     line 261, in unpack_file\n    untar_file(filename,
>> >                     location)\n  File
>> >
>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>> >                     line 177, in untar_file\n    tar =
>> >                     tarfile.open(filename, mode)\n  File
>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1591, in
>> >                     open\n    return func(name, filemode, fileobj,
>> >                     **kwargs)\n  File
>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1648, in
>> >                     gzopen\n    raise ReadError("not a gzip
>> >                     file")\ntarfile.ReadError: not a gzip
>> >                     file\n2020/08/08 01:17:07 Failed to install required
>> >                     packages: failed to install requirements: exit
>> >                     status 2\n'
>> >
>> >                     This greatly puzzled me and, after some looking, I
>> >                     found something really surprising. Here is the
>> >                     package in the /directory to be staged/:
>> >
>> >                     $ file
>> >
>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>> >                     ...: gzip compressed data, was
>> >                     "dist/alembic-1.4.2.tar", last modified: Thu Mar 19
>> >                     21:48:31 2020, max compression, original size modulo
>> >                     2^32 4730880
>> >                     $ ls -l
>> >
>>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...
>> >
>> >                     So far so good. But here is the same file inside the
>> >                     Docker container (I ssh'd into the dead container
>> >                     <
>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>> >):
>> >
>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive
>> >                     (GNU)
>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>> >
>> >                     The file has clearly been unzipped and now of course
>> >                     pip can't install it! What's going on here? Am I
>> >                     using the direct/portable runner combination wrong?
>> >
>> >                     Thanks!
>> >
>> >                     --
>> >                     Eugene Kirpichov
>> >                     http://www.linkedin.com/in/eugenekirpichov
>> >
>> >
>> >
>> >             --
>> >             Eugene Kirpichov
>> >             http://www.linkedin.com/in/eugenekirpichov
>> >
>> >
>> >
>> >         --
>> >         Eugene Kirpichov
>> >         http://www.linkedin.com/in/eugenekirpichov
>> >
>> >
>> >
>> > --
>> > Eugene Kirpichov
>> > http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Eugene Kirpichov <ek...@gmail.com>.
Hi Maximilian,

Thank you - it works fine with the embedded Flink runner (per below, seems
like it's not using Docker for running Python code? What is it using then?).

However, the original bug appears to be wider than I thought - it is also
present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems like
something is very broken in local Docker execution in general - I haven't
yet verified whether the same error will happen when running on a remote
Flink cluster.

Trying to build my own SDK containers with some more debugging so I can
figure out what's going on...


On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <mx...@apache.org> wrote:

> Looks like you ran into a bug.
>
> You could just run your program without specifying any arguments, since
> running with Python's FnApiRunner should be enough.
>
> Alternatively, how about trying to run the same pipeline with the
> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint.
> It will run the Python code embedded (loopback environment) without
> additional containers.
>
> Cheers,
> Max
>
> On 10.08.20 21:59, Eugene Kirpichov wrote:
> > Thanks Valentyn!
> >
> > Good to know that this is a bug (I'll file a bug), and that Dataflow has
> > an experimental way to use custom containers. I'll try that.
> >
> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
> > <valentyn@google.com <ma...@google.com>> wrote:
> >
> >     Hi Eugene,
> >
> >     Good to hear from you. The experience you are describing on Portable
> >     Runner + Docker container in local execution mode is most certainly
> >     a bug, if you have not opened an issue on it, please do so and feel
> >     free to cc me.
> >
> >     I can also reproduce the bug and likewise didn't see anything
> >     obvious immediately, this needs some debugging.
> >
> >     cc: +Ankur Goenka <ma...@google.com> +Kyle Weaver
> >     <ma...@google.com> who recently worked on Portable Runner
> >     and may be interested.
> >
> >     By the way, you should be able to use custom containers with
> >     Dataflow, if you set --experiments=use_runner_v2.
> >
> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
> >     <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
> >
> >         (cc'ing Sam with whom I'm working on this atm)
> >
> >         FWIW I'm still stumped. I've looked through Python, Go and Java
> >         code in the Beam repo having anything to do with
> >         gzipping/unzipping, and none of it appears to be used in the
> >         artifact staging/retrieval codepaths. I also can't find any
> >         mention of compression/decompression in the container boot code.
> >         My next step will be to add a bunch of debugging, rebuild the
> >         containers, and see what the artifact services think they're
> >         serving.
> >
> >
> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
> >         <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
> >
> >             Thanks Austin! Good stuff - though note that I am
> >             /not/ using custom containers, I'm just trying to get the
> >             basic stuff to work, a Python pipeline with a simple
> >             requirements.txt file. Feels like this should work
> >             out-of-the-box, I must be doing something wrong.
> >
> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
> >             <whatwouldaustindo@gmail.com
> >             <ma...@gmail.com>> wrote:
> >
> >                 I only believe @OrielResearch Eila Arich-Landkof
> >                 <ma...@orielresearch.org> potentially doing
> >                 applied work with custom containers (there must be
> others)!
> >
> >                 For a plug for her and @BeamSummit --  I think enough
> >                 related will be talked about in (with Conda specifics)
> >                 -->
> >
> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
> >
> >                 I'm sure others will have more things to say that are
> >                 actually helpful, on-list, before that occurs (~3 weeks).
> >
> >
> >
> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
> >                 <ekirpichov@gmail.com <ma...@gmail.com>>
> wrote:
> >
> >                     Hi old Beam friends,
> >
> >                     I left Google to work on climate change
> >                     <
> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
> >
> >                     and am now doing a short engagement with Pachama
> >                     <https://pachama.com/>. Right now I'm trying to get
> >                     a Beam Python pipeline to work; the pipeline will
> >                     use fancy requirements and native dependencies, and
> >                     we plan to run it on Cloud Dataflow (so custom
> >                     containers are not yet an option), so I'm going
> >                     straight for the direct PortableRunner as per
> >
> https://beam.apache.org/documentation/runtime/environments/.
> >
> >                     Basically I can't get a minimal Beam program with a
> >                     minimal requirements.txt file to work - the .tar.gz
> >                     of the dependency mysteriously ends up being
> >                     ungzipped and non-installable inside the Docker
> >                     container running the worker. Details below.
> >
> >                     === main.py ===
> >                     import argparse
> >                     import logging
> >
> >                     import apache_beam as beam
> >                     from apache_beam.options.pipeline_options import
> >                     PipelineOptions
> >                     from apache_beam.options.pipeline_options import
> >                     SetupOptions
> >
> >                     def run(argv=None):
> >                          parser = argparse.ArgumentParser()
> >                          known_args, pipeline_args =
> >                     parser.parse_known_args(argv)
> >
> >                          pipeline_options =
> PipelineOptions(pipeline_args)
> >
> >
>  pipeline_options.view_as(SetupOptions).save_main_session
> >                     = True
> >
> >                          with beam.Pipeline(options=pipeline_options) as
> p:
> >                              (p | 'Create' >> beam.Create(['Hello'])
> >                                 | 'Write' >> beam.io.WriteToText('/tmp'))
> >
> >
> >                     if __name__ == '__main__':
> >                          logging.getLogger().setLevel(logging.INFO)
> >                          run()
> >
> >                     === requirements.txt ===
> >                     alembic
> >
> >                     When I run the program:
> >                     $ python3 main.py
> >
>  --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
> >
> >
> >                     I get some normal output and then:
> >
> >
>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
> >                       File
> >
>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
> >                     line 261, in unpack_file\n    untar_file(filename,
> >                     location)\n  File
> >
>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
> >                     line 177, in untar_file\n    tar =
> >                     tarfile.open(filename, mode)\n  File
> >                     "/usr/local/lib/python3.7/tarfile.py", line 1591, in
> >                     open\n    return func(name, filemode, fileobj,
> >                     **kwargs)\n  File
> >                     "/usr/local/lib/python3.7/tarfile.py", line 1648, in
> >                     gzopen\n    raise ReadError("not a gzip
> >                     file")\ntarfile.ReadError: not a gzip
> >                     file\n2020/08/08 01:17:07 Failed to install required
> >                     packages: failed to install requirements: exit
> >                     status 2\n'
> >
> >                     This greatly puzzled me and, after some looking, I
> >                     found something really surprising. Here is the
> >                     package in the /directory to be staged/:
> >
> >                     $ file
> >
>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
> >                     ...: gzip compressed data, was
> >                     "dist/alembic-1.4.2.tar", last modified: Thu Mar 19
> >                     21:48:31 2020, max compression, original size modulo
> >                     2^32 4730880
> >                     $ ls -l
> >
>  /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...
> >
> >                     So far so good. But here is the same file inside the
> >                     Docker container (I ssh'd into the dead container
> >                     <
> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
> >):
> >
> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive
> >                     (GNU)
> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
> >                     /tmp/staged/alembic-1.4.2.tar.gz
> >
> >                     The file has clearly been unzipped and now of course
> >                     pip can't install it! What's going on here? Am I
> >                     using the direct/portable runner combination wrong?
> >
> >                     Thanks!
> >
> >                     --
> >                     Eugene Kirpichov
> >                     http://www.linkedin.com/in/eugenekirpichov
> >
> >
> >
> >             --
> >             Eugene Kirpichov
> >             http://www.linkedin.com/in/eugenekirpichov
> >
> >
> >
> >         --
> >         Eugene Kirpichov
> >         http://www.linkedin.com/in/eugenekirpichov
> >
> >
> >
> > --
> > Eugene Kirpichov
> > http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Maximilian Michels <mx...@apache.org>.
Looks like you ran into a bug.

You could just run your program without specifying any arguments, since 
running with Python's FnApiRunner should be enough.

Alternatively, how about trying to run the same pipeline with the 
FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint. 
It will run the Python code embedded (loopback environment) without 
additional containers.

Cheers,
Max

On 10.08.20 21:59, Eugene Kirpichov wrote:
> Thanks Valentyn!
> 
> Good to know that this is a bug (I'll file a bug), and that Dataflow has 
> an experimental way to use custom containers. I'll try that.
> 
> On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev 
> <valentyn@google.com <ma...@google.com>> wrote:
> 
>     Hi Eugene,
> 
>     Good to hear from you. The experience you are describing on Portable
>     Runner + Docker container in local execution mode is most certainly
>     a bug, if you have not opened an issue on it, please do so and feel
>     free to cc me.
> 
>     I can also reproduce the bug and likewise didn't see anything
>     obvious immediately, this needs some debugging.
> 
>     cc: +Ankur Goenka <ma...@google.com> +Kyle Weaver
>     <ma...@google.com> who recently worked on Portable Runner
>     and may be interested.
> 
>     By the way, you should be able to use custom containers with
>     Dataflow, if you set --experiments=use_runner_v2.
> 
>     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>     <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
> 
>         (cc'ing Sam with whom I'm working on this atm)
> 
>         FWIW I'm still stumped. I've looked through Python, Go and Java
>         code in the Beam repo having anything to do with
>         gzipping/unzipping, and none of it appears to be used in the
>         artifact staging/retrieval codepaths. I also can't find any
>         mention of compression/decompression in the container boot code.
>         My next step will be to add a bunch of debugging, rebuild the
>         containers, and see what the artifact services think they're
>         serving.
> 
> 
>         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>         <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
> 
>             Thanks Austin! Good stuff - though note that I am
>             /not/ using custom containers, I'm just trying to get the
>             basic stuff to work, a Python pipeline with a simple
>             requirements.txt file. Feels like this should work
>             out-of-the-box, I must be doing something wrong.
> 
>             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>             <whatwouldaustindo@gmail.com
>             <ma...@gmail.com>> wrote:
> 
>                 I only believe @OrielResearch Eila Arich-Landkof
>                 <ma...@orielresearch.org> potentially doing
>                 applied work with custom containers (there must be others)!
> 
>                 For a plug for her and @BeamSummit --  I think enough
>                 related will be talked about in (with Conda specifics)
>                 -->
>                 https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
> 
>                 I'm sure others will have more things to say that are
>                 actually helpful, on-list, before that occurs (~3 weeks).
> 
> 
> 
>                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>                 <ekirpichov@gmail.com <ma...@gmail.com>> wrote:
> 
>                     Hi old Beam friends,
> 
>                     I left Google to work on climate change
>                     <https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U>
>                     and am now doing a short engagement with Pachama
>                     <https://pachama.com/>. Right now I'm trying to get
>                     a Beam Python pipeline to work; the pipeline will
>                     use fancy requirements and native dependencies, and
>                     we plan to run it on Cloud Dataflow (so custom
>                     containers are not yet an option), so I'm going
>                     straight for the direct PortableRunner as per
>                     https://beam.apache.org/documentation/runtime/environments/.
> 
>                     Basically I can't get a minimal Beam program with a
>                     minimal requirements.txt file to work - the .tar.gz
>                     of the dependency mysteriously ends up being
>                     ungzipped and non-installable inside the Docker
>                     container running the worker. Details below.
> 
>                     === main.py ===
>                     import argparse
>                     import logging
> 
>                     import apache_beam as beam
>                     from apache_beam.options.pipeline_options import
>                     PipelineOptions
>                     from apache_beam.options.pipeline_options import
>                     SetupOptions
> 
>                     def run(argv=None):
>                          parser = argparse.ArgumentParser()
>                          known_args, pipeline_args =
>                     parser.parse_known_args(argv)
> 
>                          pipeline_options = PipelineOptions(pipeline_args)
>                         
>                     pipeline_options.view_as(SetupOptions).save_main_session
>                     = True
> 
>                          with beam.Pipeline(options=pipeline_options) as p:
>                              (p | 'Create' >> beam.Create(['Hello'])
>                                 | 'Write' >> beam.io.WriteToText('/tmp'))
> 
> 
>                     if __name__ == '__main__':
>                          logging.getLogger().setLevel(logging.INFO)
>                          run()
> 
>                     === requirements.txt ===
>                     alembic
> 
>                     When I run the program:
>                     $ python3 main.py
>                     --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
> 
> 
>                     I get some normal output and then:
> 
>                     INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>                       File
>                     "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>                     line 261, in unpack_file\n    untar_file(filename,
>                     location)\n  File
>                     "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>                     line 177, in untar_file\n    tar =
>                     tarfile.open(filename, mode)\n  File
>                     "/usr/local/lib/python3.7/tarfile.py", line 1591, in
>                     open\n    return func(name, filemode, fileobj,
>                     **kwargs)\n  File
>                     "/usr/local/lib/python3.7/tarfile.py", line 1648, in
>                     gzopen\n    raise ReadError("not a gzip
>                     file")\ntarfile.ReadError: not a gzip
>                     file\n2020/08/08 01:17:07 Failed to install required
>                     packages: failed to install requirements: exit
>                     status 2\n'
> 
>                     This greatly puzzled me and, after some looking, I
>                     found something really surprising. Here is the
>                     package in the /directory to be staged/:
> 
>                     $ file
>                     /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>                     ...: gzip compressed data, was
>                     "dist/alembic-1.4.2.tar", last modified: Thu Mar 19
>                     21:48:31 2020, max compression, original size modulo
>                     2^32 4730880
>                     $ ls -l
>                     /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...
> 
>                     So far so good. But here is the same file inside the
>                     Docker container (I ssh'd into the dead container
>                     <https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers>):
> 
>                     # file /tmp/staged/alembic-1.4.2.tar.gz
>                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive
>                     (GNU)
>                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>                     /tmp/staged/alembic-1.4.2.tar.gz
> 
>                     The file has clearly been unzipped and now of course
>                     pip can't install it! What's going on here? Am I
>                     using the direct/portable runner combination wrong?
> 
>                     Thanks!
> 
>                     -- 
>                     Eugene Kirpichov
>                     http://www.linkedin.com/in/eugenekirpichov
> 
> 
> 
>             -- 
>             Eugene Kirpichov
>             http://www.linkedin.com/in/eugenekirpichov
> 
> 
> 
>         -- 
>         Eugene Kirpichov
>         http://www.linkedin.com/in/eugenekirpichov
> 
> 
> 
> -- 
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Eugene Kirpichov <ek...@gmail.com>.
Thanks Valentyn!

Good to know that this is a bug (I'll file a bug), and that Dataflow has an
experimental way to use custom containers. I'll try that.

On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev <va...@google.com>
wrote:

> Hi Eugene,
>
> Good to hear from you. The experience you are describing on Portable
> Runner + Docker container in local execution mode is most certainly a bug,
> if you have not opened an issue on it, please do so and feel free to cc me.
>
> I can also reproduce the bug and likewise didn't see anything obvious
> immediately, this needs some debugging.
>
> cc: +Ankur Goenka <go...@google.com> +Kyle Weaver <kc...@google.com> who
> recently worked on Portable Runner and may be interested.
>
> By the way, you should be able to use custom containers with Dataflow, if
> you set --experiments=use_runner_v2.
>
> On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> (cc'ing Sam with whom I'm working on this atm)
>>
>> FWIW I'm still stumped. I've looked through Python, Go and Java code in
>> the Beam repo having anything to do with gzipping/unzipping, and none of it
>> appears to be used in the artifact staging/retrieval codepaths. I also
>> can't find any mention of compression/decompression in the container boot
>> code. My next step will be to add a bunch of debugging, rebuild the
>> containers, and see what the artifact services think they're serving.
>>
>>
>> On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> Thanks Austin! Good stuff - though note that I am *not* using custom
>>> containers, I'm just trying to get the basic stuff to work, a Python
>>> pipeline with a simple requirements.txt file. Feels like this should work
>>> out-of-the-box, I must be doing something wrong.
>>>
>>> On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett <
>>> whatwouldaustindo@gmail.com> wrote:
>>>
>>>> I only believe @OrielResearch Eila Arich-Landkof
>>>> <ei...@orielresearch.org> potentially doing applied work with custom
>>>> containers (there must be others)!
>>>>
>>>> For a plug for her and @BeamSummit --  I think enough related will be
>>>> talked about in (with Conda specifics) -->
>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>>>
>>>> I'm sure others will have more things to say that are actually helpful,
>>>> on-list, before that occurs (~3 weeks).
>>>>
>>>>
>>>>
>>>> On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov <ek...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi old Beam friends,
>>>>>
>>>>> I left Google to work on climate change
>>>>> <https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U>
>>>>> and am now doing a short engagement with Pachama
>>>>> <https://pachama.com/>. Right now I'm trying to get a Beam Python
>>>>> pipeline to work; the pipeline will use fancy requirements and native
>>>>> dependencies, and we plan to run it on Cloud Dataflow (so custom containers
>>>>> are not yet an option), so I'm going straight for the direct PortableRunner
>>>>> as per https://beam.apache.org/documentation/runtime/environments/.
>>>>>
>>>>> Basically I can't get a minimal Beam program with a minimal
>>>>> requirements.txt file to work - the .tar.gz of the dependency mysteriously
>>>>> ends up being ungzipped and non-installable inside the Docker container
>>>>> running the worker. Details below.
>>>>>
>>>>> === main.py ===
>>>>> import argparse
>>>>> import logging
>>>>>
>>>>> import apache_beam as beam
>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>> from apache_beam.options.pipeline_options import SetupOptions
>>>>>
>>>>> def run(argv=None):
>>>>>     parser = argparse.ArgumentParser()
>>>>>     known_args, pipeline_args = parser.parse_known_args(argv)
>>>>>
>>>>>     pipeline_options = PipelineOptions(pipeline_args)
>>>>>     pipeline_options.view_as(SetupOptions).save_main_session = True
>>>>>
>>>>>     with beam.Pipeline(options=pipeline_options) as p:
>>>>>         (p | 'Create' >> beam.Create(['Hello'])
>>>>>            | 'Write' >> beam.io.WriteToText('/tmp'))
>>>>>
>>>>>
>>>>> if __name__ == '__main__':
>>>>>     logging.getLogger().setLevel(logging.INFO)
>>>>>     run()
>>>>>
>>>>> === requirements.txt ===
>>>>> alembic
>>>>>
>>>>> When I run the program:
>>>>> $ python3 main.py
>>>>> --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>>>>>
>>>>>
>>>>> I get some normal output and then:
>>>>>
>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>>>  File
>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>> line 261, in unpack_file\n    untar_file(filename, location)\n  File
>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>> line 177, in untar_file\n    tar = tarfile.open(filename, mode)\n  File
>>>>> "/usr/local/lib/python3.7/tarfile.py", line 1591, in open\n    return
>>>>> func(name, filemode, fileobj, **kwargs)\n  File
>>>>> "/usr/local/lib/python3.7/tarfile.py", line 1648, in gzopen\n    raise
>>>>> ReadError("not a gzip file")\ntarfile.ReadError: not a gzip
>>>>> file\n2020/08/08 01:17:07 Failed to install required packages: failed to
>>>>> install requirements: exit status 2\n'
>>>>>
>>>>> This greatly puzzled me and, after some looking, I found something
>>>>> really surprising. Here is the package in the *directory to be staged*
>>>>> :
>>>>>
>>>>> $ file
>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>> ...: gzip compressed data, was "dist/alembic-1.4.2.tar", last
>>>>> modified: Thu Mar 19 21:48:31 2020, max compression, original size modulo
>>>>> 2^32 4730880
>>>>> $ ls -l
>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>> -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...
>>>>>
>>>>> So far so good. But here is the same file inside the Docker container
>>>>> (I ssh'd into the dead container
>>>>> <https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers>
>>>>> ):
>>>>>
>>>>> # file /tmp/staged/alembic-1.4.2.tar.gz
>>>>> /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive (GNU)
>>>>> # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>>>> -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>>>> /tmp/staged/alembic-1.4.2.tar.gz
>>>>>
>>>>> The file has clearly been unzipped and now of course pip can't install
>>>>> it! What's going on here? Am I using the direct/portable runner combination
>>>>> wrong?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>

-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Valentyn Tymofieiev <va...@google.com>.
Hi Eugene,

Good to hear from you. The experience you are describing on Portable Runner
+ Docker container in local execution mode is most certainly a bug, if you
have not opened an issue on it, please do so and feel free to cc me.

I can also reproduce the bug and likewise didn't see anything obvious
immediately, this needs some debugging.

cc: +Ankur Goenka <go...@google.com> +Kyle Weaver <kc...@google.com> who
recently worked on Portable Runner and may be interested.

By the way, you should be able to use custom containers with Dataflow, if
you set --experiments=use_runner_v2.

On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov <ek...@gmail.com>
wrote:

> (cc'ing Sam with whom I'm working on this atm)
>
> FWIW I'm still stumped. I've looked through Python, Go and Java code in
> the Beam repo having anything to do with gzipping/unzipping, and none of it
> appears to be used in the artifact staging/retrieval codepaths. I also
> can't find any mention of compression/decompression in the container boot
> code. My next step will be to add a bunch of debugging, rebuild the
> containers, and see what the artifact services think they're serving.
>
>
> On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> Thanks Austin! Good stuff - though note that I am *not* using custom
>> containers, I'm just trying to get the basic stuff to work, a Python
>> pipeline with a simple requirements.txt file. Feels like this should work
>> out-of-the-box, I must be doing something wrong.
>>
>> On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett <
>> whatwouldaustindo@gmail.com> wrote:
>>
>>> I only believe @OrielResearch Eila Arich-Landkof
>>> <ei...@orielresearch.org> potentially doing applied work with custom
>>> containers (there must be others)!
>>>
>>> For a plug for her and @BeamSummit --  I think enough related will be
>>> talked about in (with Conda specifics) -->
>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>>
>>> I'm sure others will have more things to say that are actually helpful,
>>> on-list, before that occurs (~3 weeks).
>>>
>>>
>>>
>>> On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov <ek...@gmail.com>
>>> wrote:
>>>
>>>> Hi old Beam friends,
>>>>
>>>> I left Google to work on climate change
>>>> <https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U>
>>>> and am now doing a short engagement with Pachama <https://pachama.com/>.
>>>> Right now I'm trying to get a Beam Python pipeline to work; the pipeline
>>>> will use fancy requirements and native dependencies, and we plan to run it
>>>> on Cloud Dataflow (so custom containers are not yet an option), so I'm
>>>> going straight for the direct PortableRunner as per
>>>> https://beam.apache.org/documentation/runtime/environments/.
>>>>
>>>> Basically I can't get a minimal Beam program with a minimal
>>>> requirements.txt file to work - the .tar.gz of the dependency mysteriously
>>>> ends up being ungzipped and non-installable inside the Docker container
>>>> running the worker. Details below.
>>>>
>>>> === main.py ===
>>>> import argparse
>>>> import logging
>>>>
>>>> import apache_beam as beam
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>> from apache_beam.options.pipeline_options import SetupOptions
>>>>
>>>> def run(argv=None):
>>>>     parser = argparse.ArgumentParser()
>>>>     known_args, pipeline_args = parser.parse_known_args(argv)
>>>>
>>>>     pipeline_options = PipelineOptions(pipeline_args)
>>>>     pipeline_options.view_as(SetupOptions).save_main_session = True
>>>>
>>>>     with beam.Pipeline(options=pipeline_options) as p:
>>>>         (p | 'Create' >> beam.Create(['Hello'])
>>>>            | 'Write' >> beam.io.WriteToText('/tmp'))
>>>>
>>>>
>>>> if __name__ == '__main__':
>>>>     logging.getLogger().setLevel(logging.INFO)
>>>>     run()
>>>>
>>>> === requirements.txt ===
>>>> alembic
>>>>
>>>> When I run the program:
>>>> $ python3 main.py
>>>> --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>>>>
>>>>
>>>> I get some normal output and then:
>>>>
>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>>  File
>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>> line 261, in unpack_file\n    untar_file(filename, location)\n  File
>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>> line 177, in untar_file\n    tar = tarfile.open(filename, mode)\n  File
>>>> "/usr/local/lib/python3.7/tarfile.py", line 1591, in open\n    return
>>>> func(name, filemode, fileobj, **kwargs)\n  File
>>>> "/usr/local/lib/python3.7/tarfile.py", line 1648, in gzopen\n    raise
>>>> ReadError("not a gzip file")\ntarfile.ReadError: not a gzip
>>>> file\n2020/08/08 01:17:07 Failed to install required packages: failed to
>>>> install requirements: exit status 2\n'
>>>>
>>>> This greatly puzzled me and, after some looking, I found something
>>>> really surprising. Here is the package in the *directory to be staged*:
>>>>
>>>> $ file
>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>> ...: gzip compressed data, was "dist/alembic-1.4.2.tar", last modified:
>>>> Thu Mar 19 21:48:31 2020, max compression, original size modulo 2^32 4730880
>>>> $ ls -l
>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>> -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...
>>>>
>>>> So far so good. But here is the same file inside the Docker container
>>>> (I ssh'd into the dead container
>>>> <https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers>
>>>> ):
>>>>
>>>> # file /tmp/staged/alembic-1.4.2.tar.gz
>>>> /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive (GNU)
>>>> # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>>> -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>>> /tmp/staged/alembic-1.4.2.tar.gz
>>>>
>>>> The file has clearly been unzipped and now of course pip can't install
>>>> it! What's going on here? Am I using the direct/portable runner combination
>>>> wrong?
>>>>
>>>> Thanks!
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Eugene Kirpichov <ek...@gmail.com>.
(cc'ing Sam with whom I'm working on this atm)

FWIW I'm still stumped. I've looked through Python, Go and Java code in the
Beam repo having anything to do with gzipping/unzipping, and none of it
appears to be used in the artifact staging/retrieval codepaths. I also
can't find any mention of compression/decompression in the container boot
code. My next step will be to add a bunch of debugging, rebuild the
containers, and see what the artifact services think they're serving.


On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov <ek...@gmail.com>
wrote:

> Thanks Austin! Good stuff - though note that I am *not* using custom
> containers, I'm just trying to get the basic stuff to work, a Python
> pipeline with a simple requirements.txt file. Feels like this should work
> out-of-the-box, I must be doing something wrong.
>
> On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett <wh...@gmail.com>
> wrote:
>
>> I only believe @OrielResearch Eila Arich-Landkof <ei...@orielresearch.org> potentially
>> doing applied work with custom containers (there must be others)!
>>
>> For a plug for her and @BeamSummit --  I think enough related will be
>> talked about in (with Conda specifics) -->
>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>
>> I'm sure others will have more things to say that are actually helpful,
>> on-list, before that occurs (~3 weeks).
>>
>>
>>
>> On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> Hi old Beam friends,
>>>
>>> I left Google to work on climate change
>>> <https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U>
>>> and am now doing a short engagement with Pachama <https://pachama.com/>.
>>> Right now I'm trying to get a Beam Python pipeline to work; the pipeline
>>> will use fancy requirements and native dependencies, and we plan to run it
>>> on Cloud Dataflow (so custom containers are not yet an option), so I'm
>>> going straight for the direct PortableRunner as per
>>> https://beam.apache.org/documentation/runtime/environments/.
>>>
>>> Basically I can't get a minimal Beam program with a minimal
>>> requirements.txt file to work - the .tar.gz of the dependency mysteriously
>>> ends up being ungzipped and non-installable inside the Docker container
>>> running the worker. Details below.
>>>
>>> === main.py ===
>>> import argparse
>>> import logging
>>>
>>> import apache_beam as beam
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>> from apache_beam.options.pipeline_options import SetupOptions
>>>
>>> def run(argv=None):
>>>     parser = argparse.ArgumentParser()
>>>     known_args, pipeline_args = parser.parse_known_args(argv)
>>>
>>>     pipeline_options = PipelineOptions(pipeline_args)
>>>     pipeline_options.view_as(SetupOptions).save_main_session = True
>>>
>>>     with beam.Pipeline(options=pipeline_options) as p:
>>>         (p | 'Create' >> beam.Create(['Hello'])
>>>            | 'Write' >> beam.io.WriteToText('/tmp'))
>>>
>>>
>>> if __name__ == '__main__':
>>>     logging.getLogger().setLevel(logging.INFO)
>>>     run()
>>>
>>> === requirements.txt ===
>>> alembic
>>>
>>> When I run the program:
>>> $ python3 main.py
>>> --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>>>
>>>
>>> I get some normal output and then:
>>>
>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>  File
>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>> line 261, in unpack_file\n    untar_file(filename, location)\n  File
>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>> line 177, in untar_file\n    tar = tarfile.open(filename, mode)\n  File
>>> "/usr/local/lib/python3.7/tarfile.py", line 1591, in open\n    return
>>> func(name, filemode, fileobj, **kwargs)\n  File
>>> "/usr/local/lib/python3.7/tarfile.py", line 1648, in gzopen\n    raise
>>> ReadError("not a gzip file")\ntarfile.ReadError: not a gzip
>>> file\n2020/08/08 01:17:07 Failed to install required packages: failed to
>>> install requirements: exit status 2\n'
>>>
>>> This greatly puzzled me and, after some looking, I found something
>>> really surprising. Here is the package in the *directory to be staged*:
>>>
>>> $ file
>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>> ...: gzip compressed data, was "dist/alembic-1.4.2.tar", last modified:
>>> Thu Mar 19 21:48:31 2020, max compression, original size modulo 2^32 4730880
>>> $ ls -l
>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>> -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...
>>>
>>> So far so good. But here is the same file inside the Docker container (I ssh'd
>>> into the dead container
>>> <https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers>
>>> ):
>>>
>>> # file /tmp/staged/alembic-1.4.2.tar.gz
>>> /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive (GNU)
>>> # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>> -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>> /tmp/staged/alembic-1.4.2.tar.gz
>>>
>>> The file has clearly been unzipped and now of course pip can't install
>>> it! What's going on here? Am I using the direct/portable runner combination
>>> wrong?
>>>
>>> Thanks!
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Eugene Kirpichov <ek...@gmail.com>.
Thanks Austin! Good stuff - though note that I am *not* using custom
containers, I'm just trying to get the basic stuff to work, a Python
pipeline with a simple requirements.txt file. Feels like this should work
out-of-the-box, I must be doing something wrong.

On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett <wh...@gmail.com>
wrote:

> I only believe @OrielResearch Eila Arich-Landkof <ei...@orielresearch.org> potentially
> doing applied work with custom containers (there must be others)!
>
> For a plug for her and @BeamSummit --  I think enough related will be
> talked about in (with Conda specifics) -->
> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>
> I'm sure others will have more things to say that are actually helpful,
> on-list, before that occurs (~3 weeks).
>
>
>
> On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> Hi old Beam friends,
>>
>> I left Google to work on climate change
>> <https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U>
>> and am now doing a short engagement with Pachama <https://pachama.com/>.
>> Right now I'm trying to get a Beam Python pipeline to work; the pipeline
>> will use fancy requirements and native dependencies, and we plan to run it
>> on Cloud Dataflow (so custom containers are not yet an option), so I'm
>> going straight for the direct PortableRunner as per
>> https://beam.apache.org/documentation/runtime/environments/.
>>
>> Basically I can't get a minimal Beam program with a minimal
>> requirements.txt file to work - the .tar.gz of the dependency mysteriously
>> ends up being ungzipped and non-installable inside the Docker container
>> running the worker. Details below.
>>
>> === main.py ===
>> import argparse
>> import logging
>>
>> import apache_beam as beam
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.options.pipeline_options import SetupOptions
>>
>> def run(argv=None):
>>     parser = argparse.ArgumentParser()
>>     known_args, pipeline_args = parser.parse_known_args(argv)
>>
>>     pipeline_options = PipelineOptions(pipeline_args)
>>     pipeline_options.view_as(SetupOptions).save_main_session = True
>>
>>     with beam.Pipeline(options=pipeline_options) as p:
>>         (p | 'Create' >> beam.Create(['Hello'])
>>            | 'Write' >> beam.io.WriteToText('/tmp'))
>>
>>
>> if __name__ == '__main__':
>>     logging.getLogger().setLevel(logging.INFO)
>>     run()
>>
>> === requirements.txt ===
>> alembic
>>
>> When I run the program:
>> $ python3 main.py
>> --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>>
>>
>> I get some normal output and then:
>>
>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>  File
>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>> line 261, in unpack_file\n    untar_file(filename, location)\n  File
>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>> line 177, in untar_file\n    tar = tarfile.open(filename, mode)\n  File
>> "/usr/local/lib/python3.7/tarfile.py", line 1591, in open\n    return
>> func(name, filemode, fileobj, **kwargs)\n  File
>> "/usr/local/lib/python3.7/tarfile.py", line 1648, in gzopen\n    raise
>> ReadError("not a gzip file")\ntarfile.ReadError: not a gzip
>> file\n2020/08/08 01:17:07 Failed to install required packages: failed to
>> install requirements: exit status 2\n'
>>
>> This greatly puzzled me and, after some looking, I found something really
>> surprising. Here is the package in the *directory to be staged*:
>>
>> $ file
>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>> ...: gzip compressed data, was "dist/alembic-1.4.2.tar", last modified:
>> Thu Mar 19 21:48:31 2020, max compression, original size modulo 2^32 4730880
>> $ ls -l
>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>> -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...
>>
>> So far so good. But here is the same file inside the Docker container (I ssh'd
>> into the dead container
>> <https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers>
>> ):
>>
>> # file /tmp/staged/alembic-1.4.2.tar.gz
>> /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive (GNU)
>> # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>> -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>> /tmp/staged/alembic-1.4.2.tar.gz
>>
>> The file has clearly been unzipped and now of course pip can't install
>> it! What's going on here? Am I using the direct/portable runner combination
>> wrong?
>>
>> Thanks!
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>

-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

Posted by Austin Bennett <wh...@gmail.com>.
I only believe @OrielResearch Eila Arich-Landkof
<ei...@orielresearch.org> potentially
doing applied work with custom containers (there must be others)!

For a plug for her and @BeamSummit --  I think enough related will be
talked about in (with Conda specifics) -->
https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/

I'm sure others will have more things to say that are actually helpful,
on-list, before that occurs (~3 weeks).



On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov <ek...@gmail.com>
wrote:

> Hi old Beam friends,
>
> I left Google to work on climate change
> <https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U>
> and am now doing a short engagement with Pachama <https://pachama.com/>.
> Right now I'm trying to get a Beam Python pipeline to work; the pipeline
> will use fancy requirements and native dependencies, and we plan to run it
> on Cloud Dataflow (so custom containers are not yet an option), so I'm
> going straight for the direct PortableRunner as per
> https://beam.apache.org/documentation/runtime/environments/.
>
> Basically I can't get a minimal Beam program with a minimal
> requirements.txt file to work - the .tar.gz of the dependency mysteriously
> ends up being ungzipped and non-installable inside the Docker container
> running the worker. Details below.
>
> === main.py ===
> import argparse
> import logging
>
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
>
> def run(argv=None):
>     parser = argparse.ArgumentParser()
>     known_args, pipeline_args = parser.parse_known_args(argv)
>
>     pipeline_options = PipelineOptions(pipeline_args)
>     pipeline_options.view_as(SetupOptions).save_main_session = True
>
>     with beam.Pipeline(options=pipeline_options) as p:
>         (p | 'Create' >> beam.Create(['Hello'])
>            | 'Write' >> beam.io.WriteToText('/tmp'))
>
>
> if __name__ == '__main__':
>     logging.getLogger().setLevel(logging.INFO)
>     run()
>
> === requirements.txt ===
> alembic
>
> When I run the program:
> $ python3 main.py
> --runner=PortableRunner --job_endpoint=embed --requirements_file=requirements.txt
>
>
> I get some normal output and then:
>
> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>  File
> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
> line 261, in unpack_file\n    untar_file(filename, location)\n  File
> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
> line 177, in untar_file\n    tar = tarfile.open(filename, mode)\n  File
> "/usr/local/lib/python3.7/tarfile.py", line 1591, in open\n    return
> func(name, filemode, fileobj, **kwargs)\n  File
> "/usr/local/lib/python3.7/tarfile.py", line 1648, in gzopen\n    raise
> ReadError("not a gzip file")\ntarfile.ReadError: not a gzip
> file\n2020/08/08 01:17:07 Failed to install required packages: failed to
> install requirements: exit status 2\n'
>
> This greatly puzzled me and, after some looking, I found something really
> surprising. Here is the package in the *directory to be staged*:
>
> $ file
> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
> ...: gzip compressed data, was "dist/alembic-1.4.2.tar", last modified:
> Thu Mar 19 21:48:31 2020, max compression, original size modulo 2^32 4730880
> $ ls -l
> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
> -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...
>
> So far so good. But here is the same file inside the Docker container (I ssh'd
> into the dead container
> <https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers>
> ):
>
> # file /tmp/staged/alembic-1.4.2.tar.gz
> /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive (GNU)
> # ls -l /tmp/staged/alembic-1.4.2.tar.gz
> -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
> /tmp/staged/alembic-1.4.2.tar.gz
>
> The file has clearly been unzipped and now of course pip can't install it!
> What's going on here? Am I using the direct/portable runner combination
> wrong?
>
> Thanks!
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>