You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/07/01 00:13:00 UTC

[jira] [Work logged] (BEAM-11275) Support GCS files for extra_requirements argument in Python Beam portable runners

     [ https://issues.apache.org/jira/browse/BEAM-11275?focusedWorklogId=617354&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-617354 ]

ASF GitHub Bot logged work on BEAM-11275:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Jul/21 00:12
            Start Date: 01/Jul/21 00:12
    Worklog Time Spent: 10m 
      Work Description: ibzib commented on a change in pull request #15105:
URL: https://github.com/apache/beam/pull/15105#discussion_r661884341



##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -526,6 +534,14 @@ def _create_jar_packages(jar_packages, temp_dir):
 
     return resources
 
+  @staticmethod
+  def _should_download_remote_extra_packages(package):

Review comment:
       I think maybe the best policy is to just never download remote packages here. If the user wanted to download packages, I figure they could just do it themselves and pass them in as local files. WDYT?

##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -569,12 +586,17 @@ def _create_extra_packages(extra_packages, temp_dir):
 
       if not os.path.isfile(package):
         if Stager._is_remote_path(package):
-          # Download remote package.
-          _LOGGER.info(
-              'Downloading extra package: %s locally before staging', package)
-          _, last_component = FileSystems.split(package)
-          local_file_path = FileSystems.join(staging_temp_dir, last_component)
-          Stager._download_file(package, local_file_path)
+          if Stager._should_download_remote_extra_packages(package):
+            # Download remote package.
+            _LOGGER.info(
+                'Downloading extra package: %s locally before staging', package)
+            _, last_component = FileSystems.split(package)
+            local_file_path = FileSystems.join(staging_temp_dir, last_component)
+            Stager._download_file(package, local_file_path)
+          else:
+            remote_packages.append(package)
+            _LOGGER.info(
+                'Deferring download of extra package: %s to workers', package)

Review comment:
       This is potentially a breaking change for some folks (if their retrieval service doesn't have the right networking/permissions but their stager does) so it's worth documenting in the release notes (https://github.com/apache/beam/blob/master/CHANGES.md).

##########
File path: sdks/python/apache_beam/runners/portability/artifact_service_test.py
##########
@@ -122,6 +124,24 @@ def test_url_retrieval(self):
     with open(__file__, 'rb') as fin:
       self.assertEqual(content, fin.read())
 
+  def test_gcs_retrieval(self):
+    retrieval_service = artifact_service.ArtifactRetrievalService(None)
+    url_dep = beam_runner_api_pb2.ArtifactInformation(
+        type_urn=common_urns.artifact_types.URL.urn,
+        type_payload=beam_runner_api_pb2.ArtifactUrlPayload(
+            url='gs://test_gcs_retrieval').SerializeToString())
+    with mock.patch('apache_beam.runners.portability.artifact_service.'
+                    'GCSFileSystem.open') as mock_open:
+      mock_read_handle = mock.Mock()
+      mock_read_handle.read.return_value = b''
+      mock_open.return_value = mock_read_handle
+      content = b''.join([

Review comment:
       Nit: `content` variable is unused.

##########
File path: sdks/python/apache_beam/runners/portability/artifact_service.py
##########
@@ -77,8 +79,10 @@ def GetArtifact(self, request, context=None):
     elif request.artifact.type_urn == common_urns.artifact_types.URL.urn:
       payload = proto_utils.parse_Bytes(
           request.artifact.type_payload, beam_runner_api_pb2.ArtifactUrlPayload)
-      # TODO(Py3): Remove the unneeded contextlib wrapper.
-      read_handle = contextlib.closing(urlopen(payload.url))
+      if FileSystems.get_scheme(payload.url) == GCSFileSystem.scheme():

Review comment:
       Can we generalize this to any Beam filesystem (perhaps using `FileSystems.open`)? https://beam.apache.org/releases/pydoc/2.30.0/apache_beam.io.filesystems.html#apache_beam.io.filesystems.FileSystems.open

##########
File path: sdks/python/apache_beam/runners/portability/artifact_service_test.py
##########
@@ -122,6 +124,24 @@ def test_url_retrieval(self):
     with open(__file__, 'rb') as fin:
       self.assertEqual(content, fin.read())
 
+  def test_gcs_retrieval(self):
+    retrieval_service = artifact_service.ArtifactRetrievalService(None)
+    url_dep = beam_runner_api_pb2.ArtifactInformation(
+        type_urn=common_urns.artifact_types.URL.urn,
+        type_payload=beam_runner_api_pb2.ArtifactUrlPayload(
+            url='gs://test_gcs_retrieval').SerializeToString())

Review comment:
       Nit: though it must not matter here, `gs://test_gcs_retrieval` is a bucket, not a valid GCS object path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 617354)
    Time Spent: 1h 10m  (was: 1h)

> Support GCS files for extra_requirements argument in Python Beam portable runners
> ---------------------------------------------------------------------------------
>
>                 Key: BEAM-11275
>                 URL: https://issues.apache.org/jira/browse/BEAM-11275
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Gerard Casas Saez
>            Assignee: Calvin Leung
>            Priority: P2
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Currently Portable runners only support locally available files for adding dependencies on remote workers. This can be seen in https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/stager.py#L429 as it uses shutil.copyfile when it detects file is remote and it is not http.
> An easy extension would be to extend _is_remote_path in Stager to detect if the path matches any filesystem and if it does the avoid downloading and let it be copied afterwards. 
> Acceptance criteria:
> - `extra_package` can be a GCS path instead of requiring it to be local only.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)