You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2020/08/19 23:40:51 UTC

[beam] branch release-2.24.0 updated: [BEAM-10702, BEAM-10757] Cherrypick #12571: Do not implicitly decompress artifacts (#12619)

This is an automated email from the ASF dual-hosted git repository.

danoliveira pushed a commit to branch release-2.24.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.24.0 by this push:
     new 33d2437  [BEAM-10702, BEAM-10757] Cherrypick #12571: Do not implicitly decompress artifacts (#12619)
33d2437 is described below

commit 33d24372098a50f822a20416091f71287caf2917
Author: Eugene Kirpichov <ek...@gmail.com>
AuthorDate: Wed Aug 19 16:40:34 2020 -0700

    [BEAM-10702, BEAM-10757] Cherrypick #12571: Do not implicitly decompress artifacts (#12619)
    
    * [BEAM-10702] Do not implicitly decompress artifacts
    
    Also adds a Julia set test on portable local runner, which uses a
    setup.py and hence exercises the artifact staging codepath.
    
    This is a squashed cherrypick of https://github.com/apache/beam/pull/12571
    
    * Add enum34 to manual_licenses
---
 CHANGES.md                                         |  3 +-
 .../runners/portability/artifact_service.py        |  4 ++-
 .../portability/fn_api_runner/worker_handlers.py   |  7 ++++-
 .../container/license_scripts/dep_urls_py.yaml     |  4 ++-
 .../license_scripts/manual_licenses/enum34/LICENSE | 32 ++++++++++++++++++++++
 sdks/python/test-suites/portable/common.gradle     | 25 +++++++++++++++++
 6 files changed, 70 insertions(+), 5 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a4882b3..45e29c9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,16 +63,15 @@
   you may use `--experiments=use_legacy_bq_sink`.
 * Add cross-language support to Java's JdbcIO, now available in the Python module `apache_beam.io.external.jdbc` ([BEAM-10135](https://issues.apache.org/jira/browse/BEAM-10135), [BEAM-10136](https://issues.apache.org/jira/browse/BEAM-10136)).
 * Add support of AWS SDK v2 for KinesisIO.Read (Java) ([BEAM-9702](https://issues.apache.org/jira/browse/BEAM-9702)).
-* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Add streaming support to SnowflakeIO in Java SDK ([BEAM-9896](https://issues.apache.org/jira/browse/BEAM-9896  ))
 * Support reading and writing to Google Healthcare DICOM APIs in Python SDK ([BEAM-10601](https://issues.apache.org/jira/browse/BEAM-10601))
 
 ## New Features / Improvements
 
 * Shared library for simplifying management of large shared objects added to Python SDK. Example use case is sharing a large TF model object across threads ([BEAM-10417](https://issues.apache.org/jira/browse/BEAM-10417)).
-* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)).
 * OnTimerContext should not create a new one when processing each element/timer in FnApiDoFnRunner ([BEAM-9839](https://issues.apache.org/jira/browse/BEAM-9839))
+* Fixed BEAM-10702 (Python) - Python portable runner with embedded job endpoint was unable to run pipelines which had dependencies in setup.py or requirements.txt ([BEAM-10702](https://issues.apache.org/jira/browse/BEAM-10702)).
 
 ## Breaking Changes
 
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py
index f2bbf53..1f3ec1c 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -41,6 +41,7 @@ import grpc
 from future.moves.urllib.request import urlopen
 
 from apache_beam.io import filesystems
+from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_artifact_api_pb2
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
@@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
     self._root = root
 
   def file_reader(self, path):
-    return filesystems.FileSystems.open(path)
+    return filesystems.FileSystems.open(
+        path, compression_type=CompressionTypes.UNCOMPRESSED)
 
   def file_writer(self, name=None):
     full_path = filesystems.FileSystems.join(self._root, name)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index 9d27549..1f1d483 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -45,6 +45,7 @@ from typing import overload
 import grpc
 
 from apache_beam.io import filesystems
+from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
@@ -464,9 +465,13 @@ class GrpcServer(object):
                 self.provision_info.provision_info, worker_manager),
             self.control_server)
 
+      def open_uncompressed(f):
+        return filesystems.FileSystems.open(
+            f, compression_type=CompressionTypes.UNCOMPRESSED)
+
       beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
           artifact_service.ArtifactRetrievalService(
-              file_reader=filesystems.FileSystems.open),
+              file_reader=open_uncompressed),
           self.control_server)
 
     self.data_plane_handler = data_plane.BeamFnDataServicer(
diff --git a/sdks/python/container/license_scripts/dep_urls_py.yaml b/sdks/python/container/license_scripts/dep_urls_py.yaml
index d1e194f..13aa4cf 100644
--- a/sdks/python/container/license_scripts/dep_urls_py.yaml
+++ b/sdks/python/container/license_scripts/dep_urls_py.yaml
@@ -48,7 +48,9 @@ pip_dependencies:
   cython:
     license: "https://raw.githubusercontent.com/cython/cython/master/LICENSE.txt"
   enum34:
-    license: "https://bitbucket.org/stoneleaf/enum34/raw/c208549a93b71b948ff7bbdfd29dce8f85527916/enum/LICENSE"
+    # The original repo is down. This license taken from somebody's clone:
+    # https://github.com/jamespharaoh/python-enum34/blob/master/enum/LICENSE
+    license: "file:///tmp/license_scripts/manual_licenses/enum34/LICENSE"
   fastavro:
     license: "https://raw.githubusercontent.com/fastavro/fastavro/master/LICENSE"
     notice: "https://raw.githubusercontent.com/fastavro/fastavro/master/NOTICE.txt"
diff --git a/sdks/python/container/license_scripts/manual_licenses/enum34/LICENSE b/sdks/python/container/license_scripts/manual_licenses/enum34/LICENSE
new file mode 100644
index 0000000..9003b88
--- /dev/null
+++ b/sdks/python/container/license_scripts/manual_licenses/enum34/LICENSE
@@ -0,0 +1,32 @@
+Copyright (c) 2013, Ethan Furman.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+    Redistributions of source code must retain the above
+    copyright notice, this list of conditions and the
+    following disclaimer.
+
+    Redistributions in binary form must reproduce the above
+    copyright notice, this list of conditions and the following
+    disclaimer in the documentation and/or other materials
+    provided with the distribution.
+
+    Neither the name Ethan Furman nor the names of any
+    contributors may be used to endorse or promote products
+    derived from this software without specific prior written
+    permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle
index 48312a6..60f825c 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -101,6 +101,30 @@ task flinkTriggerTranscript() {
   }
 }
 
+// Verifies BEAM-10702.
+task portableLocalRunnerJuliaSetWithSetupPy {
+  dependsOn 'setupVirtualenv'
+  dependsOn ":sdks:python:container:py${pythonVersionSuffix}:docker"
+
+  doLast {
+    exec {
+      executable 'sh'
+      args '-c', """
+          . ${envdir}/bin/activate \\
+          && cd ${pythonRootDir} \\
+          && pip install -e . \\
+          && cd apache_beam/examples/complete/juliaset \\
+          && python juliaset_main.py \\
+              --runner=PortableRunner \\
+              --job_endpoint=embed \\
+              --setup_file=./setup.py \\
+              --coordinate_output=/tmp/juliaset \\
+              --grid_size=1
+          """
+    }
+  }
+}
+
 task createProcessWorker {
   dependsOn ':sdks:python:container:build'
   dependsOn 'setupVirtualenv'
@@ -181,6 +205,7 @@ project.task("postCommitPy${pythonVersionSuffix}") {
   dependsOn = ['setupVirtualenv',
                "postCommitPy${pythonVersionSuffix}IT",
                ':runners:spark:job-server:shadowJar',
+               'portableLocalRunnerJuliaSetWithSetupPy',
                'portableWordCountSparkRunnerBatch']
 }