You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2020/08/18 20:12:36 UTC

[beam] branch jkff-cherry-12571 updated (f3c3d2f -> d582acf)

This is an automated email from the ASF dual-hosted git repository.

jkff pushed a change to branch jkff-cherry-12571
in repository https://gitbox.apache.org/repos/asf/beam.git.


 discard f3c3d2f  [BEAM-10702] Do not implicitly decompress artifacts
     new d582acf  [BEAM-10702] Do not implicitly decompress artifacts

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f3c3d2f)
            \
             N -- N -- N   refs/heads/jkff-cherry-12571 (d582acf)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.md | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)


[beam] 01/01: [BEAM-10702] Do not implicitly decompress artifacts

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch jkff-cherry-12571
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d582acf51955539c365e454f60868a2abba7ab57
Author: Eugene Kirpichov <ek...@gmail.com>
AuthorDate: Thu Aug 13 12:06:55 2020 -0700

    [BEAM-10702] Do not implicitly decompress artifacts
    
    Also adds a Julia set test on portable local runner, which uses a
    setup.py and hence exercises the artifact staging codepath.
    
    This is a squashed cherrypick of https://github.com/apache/beam/pull/12571
---
 CHANGES.md                                         |  3 +--
 .../runners/portability/artifact_service.py        |  4 +++-
 .../portability/fn_api_runner/worker_handlers.py   |  7 +++++-
 sdks/python/test-suites/portable/common.gradle     | 25 ++++++++++++++++++++++
 4 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a4882b3..45e29c9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,16 +63,15 @@
   you may use `--experiments=use_legacy_bq_sink`.
 * Add cross-language support to Java's JdbcIO, now available in the Python module `apache_beam.io.external.jdbc` ([BEAM-10135](https://issues.apache.org/jira/browse/BEAM-10135), [BEAM-10136](https://issues.apache.org/jira/browse/BEAM-10136)).
 * Add support of AWS SDK v2 for KinesisIO.Read (Java) ([BEAM-9702](https://issues.apache.org/jira/browse/BEAM-9702)).
-* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Add streaming support to SnowflakeIO in Java SDK ([BEAM-9896](https://issues.apache.org/jira/browse/BEAM-9896  ))
 * Support reading and writing to Google Healthcare DICOM APIs in Python SDK ([BEAM-10601](https://issues.apache.org/jira/browse/BEAM-10601))
 
 ## New Features / Improvements
 
 * Shared library for simplifying management of large shared objects added to Python SDK. Example use case is sharing a large TF model object across threads ([BEAM-10417](https://issues.apache.org/jira/browse/BEAM-10417)).
-* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)).
 * OnTimerContext should not create a new one when processing each element/timer in FnApiDoFnRunner ([BEAM-9839](https://issues.apache.org/jira/browse/BEAM-9839))
+* Fixed BEAM-10702 (Python) - Python portable runner with embedded job endpoint was unable to run pipelines which had dependencies in setup.py or requirements.txt ([BEAM-10702](https://issues.apache.org/jira/browse/BEAM-10702)).
 
 ## Breaking Changes
 
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py
index f2bbf53..1f3ec1c 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -41,6 +41,7 @@ import grpc
 from future.moves.urllib.request import urlopen
 
 from apache_beam.io import filesystems
+from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_artifact_api_pb2
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
@@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
     self._root = root
 
   def file_reader(self, path):
-    return filesystems.FileSystems.open(path)
+    return filesystems.FileSystems.open(
+        path, compression_type=CompressionTypes.UNCOMPRESSED)
 
   def file_writer(self, name=None):
     full_path = filesystems.FileSystems.join(self._root, name)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index 9d27549..1f1d483 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -45,6 +45,7 @@ from typing import overload
 import grpc
 
 from apache_beam.io import filesystems
+from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
@@ -464,9 +465,13 @@ class GrpcServer(object):
                 self.provision_info.provision_info, worker_manager),
             self.control_server)
 
+      def open_uncompressed(f):
+        return filesystems.FileSystems.open(
+            f, compression_type=CompressionTypes.UNCOMPRESSED)
+
       beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
           artifact_service.ArtifactRetrievalService(
-              file_reader=filesystems.FileSystems.open),
+              file_reader=open_uncompressed),
           self.control_server)
 
     self.data_plane_handler = data_plane.BeamFnDataServicer(
diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle
index 48312a6..60f825c 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -101,6 +101,30 @@ task flinkTriggerTranscript() {
   }
 }
 
+// Verifies BEAM-10702.
+task portableLocalRunnerJuliaSetWithSetupPy {
+  dependsOn 'setupVirtualenv'
+  dependsOn ":sdks:python:container:py${pythonVersionSuffix}:docker"
+
+  doLast {
+    exec {
+      executable 'sh'
+      args '-c', """
+          . ${envdir}/bin/activate \\
+          && cd ${pythonRootDir} \\
+          && pip install -e . \\
+          && cd apache_beam/examples/complete/juliaset \\
+          && python juliaset_main.py \\
+              --runner=PortableRunner \\
+              --job_endpoint=embed \\
+              --setup_file=./setup.py \\
+              --coordinate_output=/tmp/juliaset \\
+              --grid_size=1
+          """
+    }
+  }
+}
+
 task createProcessWorker {
   dependsOn ':sdks:python:container:build'
   dependsOn 'setupVirtualenv'
@@ -181,6 +205,7 @@ project.task("postCommitPy${pythonVersionSuffix}") {
   dependsOn = ['setupVirtualenv',
                "postCommitPy${pythonVersionSuffix}IT",
                ':runners:spark:job-server:shadowJar',
+               'portableLocalRunnerJuliaSetWithSetupPy',
                'portableWordCountSparkRunnerBatch']
 }