You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2020/08/19 23:40:51 UTC
[beam] branch release-2.24.0 updated: [BEAM-10702,
BEAM-10757] Cherrypick #12571: Do not implicitly decompress
artifacts (#12619)
This is an automated email from the ASF dual-hosted git repository.
danoliveira pushed a commit to branch release-2.24.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.24.0 by this push:
new 33d2437 [BEAM-10702, BEAM-10757] Cherrypick #12571: Do not implicitly decompress artifacts (#12619)
33d2437 is described below
commit 33d24372098a50f822a20416091f71287caf2917
Author: Eugene Kirpichov <ek...@gmail.com>
AuthorDate: Wed Aug 19 16:40:34 2020 -0700
[BEAM-10702, BEAM-10757] Cherrypick #12571: Do not implicitly decompress artifacts (#12619)
* [BEAM-10702] Do not implicitly decompress artifacts
Also adds a Julia set test on portable local runner, which uses a
setup.py and hence exercises the artifact staging codepath.
This is a squashed cherrypick of https://github.com/apache/beam/pull/12571
* Add enum34 to manual_licenses
---
CHANGES.md | 3 +-
.../runners/portability/artifact_service.py | 4 ++-
.../portability/fn_api_runner/worker_handlers.py | 7 ++++-
.../container/license_scripts/dep_urls_py.yaml | 4 ++-
.../license_scripts/manual_licenses/enum34/LICENSE | 32 ++++++++++++++++++++++
sdks/python/test-suites/portable/common.gradle | 25 +++++++++++++++++
6 files changed, 70 insertions(+), 5 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index a4882b3..45e29c9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,16 +63,15 @@
you may use `--experiments=use_legacy_bq_sink`.
* Add cross-language support to Java's JdbcIO, now available in the Python module `apache_beam.io.external.jdbc` ([BEAM-10135](https://issues.apache.org/jira/browse/BEAM-10135), [BEAM-10136](https://issues.apache.org/jira/browse/BEAM-10136)).
* Add support of AWS SDK v2 for KinesisIO.Read (Java) ([BEAM-9702](https://issues.apache.org/jira/browse/BEAM-9702)).
-* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Add streaming support to SnowflakeIO in Java SDK ([BEAM-9896](https://issues.apache.org/jira/browse/BEAM-9896 ))
* Support reading and writing to Google Healthcare DICOM APIs in Python SDK ([BEAM-10601](https://issues.apache.org/jira/browse/BEAM-10601))
## New Features / Improvements
* Shared library for simplifying management of large shared objects added to Python SDK. Example use case is sharing a large TF model object across threads ([BEAM-10417](https://issues.apache.org/jira/browse/BEAM-10417)).
-* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)).
* OnTimerContext should not create a new one when processing each element/timer in FnApiDoFnRunner ([BEAM-9839](https://issues.apache.org/jira/browse/BEAM-9839))
+* Fixed BEAM-10702 (Python) - Python portable runner with embedded job endpoint was unable to run pipelines which had dependencies in setup.py or requirements.txt ([BEAM-10702](https://issues.apache.org/jira/browse/BEAM-10702)).
## Breaking Changes
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py
index f2bbf53..1f3ec1c 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -41,6 +41,7 @@ import grpc
from future.moves.urllib.request import urlopen
from apache_beam.io import filesystems
+from apache_beam.io.filesystems import CompressionTypes
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_artifact_api_pb2
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
@@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
self._root = root
def file_reader(self, path):
- return filesystems.FileSystems.open(path)
+ return filesystems.FileSystems.open(
+ path, compression_type=CompressionTypes.UNCOMPRESSED)
def file_writer(self, name=None):
full_path = filesystems.FileSystems.join(self._root, name)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index 9d27549..1f1d483 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -45,6 +45,7 @@ from typing import overload
import grpc
from apache_beam.io import filesystems
+from apache_beam.io.filesystems import CompressionTypes
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
@@ -464,9 +465,13 @@ class GrpcServer(object):
self.provision_info.provision_info, worker_manager),
self.control_server)
+ def open_uncompressed(f):
+ return filesystems.FileSystems.open(
+ f, compression_type=CompressionTypes.UNCOMPRESSED)
+
beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
artifact_service.ArtifactRetrievalService(
- file_reader=filesystems.FileSystems.open),
+ file_reader=open_uncompressed),
self.control_server)
self.data_plane_handler = data_plane.BeamFnDataServicer(
diff --git a/sdks/python/container/license_scripts/dep_urls_py.yaml b/sdks/python/container/license_scripts/dep_urls_py.yaml
index d1e194f..13aa4cf 100644
--- a/sdks/python/container/license_scripts/dep_urls_py.yaml
+++ b/sdks/python/container/license_scripts/dep_urls_py.yaml
@@ -48,7 +48,9 @@ pip_dependencies:
cython:
license: "https://raw.githubusercontent.com/cython/cython/master/LICENSE.txt"
enum34:
- license: "https://bitbucket.org/stoneleaf/enum34/raw/c208549a93b71b948ff7bbdfd29dce8f85527916/enum/LICENSE"
+ # The original repo is down. This license taken from somebody's clone:
+ # https://github.com/jamespharaoh/python-enum34/blob/master/enum/LICENSE
+ license: "file:///tmp/license_scripts/manual_licenses/enum34/LICENSE"
fastavro:
license: "https://raw.githubusercontent.com/fastavro/fastavro/master/LICENSE"
notice: "https://raw.githubusercontent.com/fastavro/fastavro/master/NOTICE.txt"
diff --git a/sdks/python/container/license_scripts/manual_licenses/enum34/LICENSE b/sdks/python/container/license_scripts/manual_licenses/enum34/LICENSE
new file mode 100644
index 0000000..9003b88
--- /dev/null
+++ b/sdks/python/container/license_scripts/manual_licenses/enum34/LICENSE
@@ -0,0 +1,32 @@
+Copyright (c) 2013, Ethan Furman.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+ Redistributions of source code must retain the above
+ copyright notice, this list of conditions and the
+ following disclaimer.
+
+ Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials
+ provided with the distribution.
+
+ Neither the name Ethan Furman nor the names of any
+ contributors may be used to endorse or promote products
+ derived from this software without specific prior written
+ permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle
index 48312a6..60f825c 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -101,6 +101,30 @@ task flinkTriggerTranscript() {
}
}
+// Verifies BEAM-10702.
+task portableLocalRunnerJuliaSetWithSetupPy {
+ dependsOn 'setupVirtualenv'
+ dependsOn ":sdks:python:container:py${pythonVersionSuffix}:docker"
+
+ doLast {
+ exec {
+ executable 'sh'
+ args '-c', """
+ . ${envdir}/bin/activate \\
+ && cd ${pythonRootDir} \\
+ && pip install -e . \\
+ && cd apache_beam/examples/complete/juliaset \\
+ && python juliaset_main.py \\
+ --runner=PortableRunner \\
+ --job_endpoint=embed \\
+ --setup_file=./setup.py \\
+ --coordinate_output=/tmp/juliaset \\
+ --grid_size=1
+ """
+ }
+ }
+}
+
task createProcessWorker {
dependsOn ':sdks:python:container:build'
dependsOn 'setupVirtualenv'
@@ -181,6 +205,7 @@ project.task("postCommitPy${pythonVersionSuffix}") {
dependsOn = ['setupVirtualenv',
"postCommitPy${pythonVersionSuffix}IT",
':runners:spark:job-server:shadowJar',
+ 'portableLocalRunnerJuliaSetWithSetupPy',
'portableWordCountSparkRunnerBatch']
}