You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2020/08/18 20:07:34 UTC

[beam] branch jkff-cherry-12571 created (now f3c3d2f)

This is an automated email from the ASF dual-hosted git repository.

jkff pushed a change to branch jkff-cherry-12571
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at f3c3d2f  [BEAM-10702] Do not implicitly decompress artifacts

This branch includes the following new commits:

     new f3c3d2f  [BEAM-10702] Do not implicitly decompress artifacts

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: [BEAM-10702] Do not implicitly decompress artifacts

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch jkff-cherry-12571
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f3c3d2f06ceba2b4379187278ae7b8b50f0d528f
Author: Eugene Kirpichov <ek...@gmail.com>
AuthorDate: Thu Aug 13 12:06:55 2020 -0700

    [BEAM-10702] Do not implicitly decompress artifacts
    
    Also adds a Julia set test on portable local runner, which uses a
    setup.py and hence exercises the artifact staging codepath.
    
    This is a squashed cherrypick of https://github.com/apache/beam/pull/12571
---
 .../runners/portability/artifact_service.py        |  4 +++-
 .../portability/fn_api_runner/worker_handlers.py   |  7 +++++-
 sdks/python/test-suites/portable/common.gradle     | 25 ++++++++++++++++++++++
 3 files changed, 34 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py
index f2bbf53..1f3ec1c 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -41,6 +41,7 @@ import grpc
 from future.moves.urllib.request import urlopen
 
 from apache_beam.io import filesystems
+from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_artifact_api_pb2
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
@@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
     self._root = root
 
   def file_reader(self, path):
-    return filesystems.FileSystems.open(path)
+    return filesystems.FileSystems.open(
+        path, compression_type=CompressionTypes.UNCOMPRESSED)
 
   def file_writer(self, name=None):
     full_path = filesystems.FileSystems.join(self._root, name)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index 9d27549..1f1d483 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -45,6 +45,7 @@ from typing import overload
 import grpc
 
 from apache_beam.io import filesystems
+from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
@@ -464,9 +465,13 @@ class GrpcServer(object):
                 self.provision_info.provision_info, worker_manager),
             self.control_server)
 
+      def open_uncompressed(f):
+        return filesystems.FileSystems.open(
+            f, compression_type=CompressionTypes.UNCOMPRESSED)
+
       beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
           artifact_service.ArtifactRetrievalService(
-              file_reader=filesystems.FileSystems.open),
+              file_reader=open_uncompressed),
           self.control_server)
 
     self.data_plane_handler = data_plane.BeamFnDataServicer(
diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle
index 48312a6..60f825c 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -101,6 +101,30 @@ task flinkTriggerTranscript() {
   }
 }
 
+// Verifies BEAM-10702.
+task portableLocalRunnerJuliaSetWithSetupPy {
+  dependsOn 'setupVirtualenv'
+  dependsOn ":sdks:python:container:py${pythonVersionSuffix}:docker"
+
+  doLast {
+    exec {
+      executable 'sh'
+      args '-c', """
+          . ${envdir}/bin/activate \\
+          && cd ${pythonRootDir} \\
+          && pip install -e . \\
+          && cd apache_beam/examples/complete/juliaset \\
+          && python juliaset_main.py \\
+              --runner=PortableRunner \\
+              --job_endpoint=embed \\
+              --setup_file=./setup.py \\
+              --coordinate_output=/tmp/juliaset \\
+              --grid_size=1
+          """
+    }
+  }
+}
+
 task createProcessWorker {
   dependsOn ':sdks:python:container:build'
   dependsOn 'setupVirtualenv'
@@ -181,6 +205,7 @@ project.task("postCommitPy${pythonVersionSuffix}") {
   dependsOn = ['setupVirtualenv',
                "postCommitPy${pythonVersionSuffix}IT",
                ':runners:spark:job-server:shadowJar',
+               'portableLocalRunnerJuliaSetWithSetupPy',
                'portableWordCountSparkRunnerBatch']
 }