You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/07/01 00:12:42 UTC

[GitHub] [beam] ibzib commented on a change in pull request #15105: [BEAM-11275] Defer remote package download in stager and GetArtifact from GCS

ibzib commented on a change in pull request #15105:
URL: https://github.com/apache/beam/pull/15105#discussion_r661884341



##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -526,6 +534,14 @@ def _create_jar_packages(jar_packages, temp_dir):
 
     return resources
 
+  @staticmethod
+  def _should_download_remote_extra_packages(package):

Review comment:
       I think maybe the best policy is to just never download remote packages here. If the user wanted to download packages, I figure they could just do it themselves and pass them in as local files. WDYT?

##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -569,12 +586,17 @@ def _create_extra_packages(extra_packages, temp_dir):
 
       if not os.path.isfile(package):
         if Stager._is_remote_path(package):
-          # Download remote package.
-          _LOGGER.info(
-              'Downloading extra package: %s locally before staging', package)
-          _, last_component = FileSystems.split(package)
-          local_file_path = FileSystems.join(staging_temp_dir, last_component)
-          Stager._download_file(package, local_file_path)
+          if Stager._should_download_remote_extra_packages(package):
+            # Download remote package.
+            _LOGGER.info(
+                'Downloading extra package: %s locally before staging', package)
+            _, last_component = FileSystems.split(package)
+            local_file_path = FileSystems.join(staging_temp_dir, last_component)
+            Stager._download_file(package, local_file_path)
+          else:
+            remote_packages.append(package)
+            _LOGGER.info(
+                'Deferring download of extra package: %s to workers', package)

Review comment:
       This is potentially a breaking change for some folks (if their retrieval service doesn't have the right networking/permissions but their stager does) so it's worth documenting in the release notes (https://github.com/apache/beam/blob/master/CHANGES.md).

##########
File path: sdks/python/apache_beam/runners/portability/artifact_service_test.py
##########
@@ -122,6 +124,24 @@ def test_url_retrieval(self):
     with open(__file__, 'rb') as fin:
       self.assertEqual(content, fin.read())
 
+  def test_gcs_retrieval(self):
+    retrieval_service = artifact_service.ArtifactRetrievalService(None)
+    url_dep = beam_runner_api_pb2.ArtifactInformation(
+        type_urn=common_urns.artifact_types.URL.urn,
+        type_payload=beam_runner_api_pb2.ArtifactUrlPayload(
+            url='gs://test_gcs_retrieval').SerializeToString())
+    with mock.patch('apache_beam.runners.portability.artifact_service.'
+                    'GCSFileSystem.open') as mock_open:
+      mock_read_handle = mock.Mock()
+      mock_read_handle.read.return_value = b''
+      mock_open.return_value = mock_read_handle
+      content = b''.join([

Review comment:
       Nit: `content` variable is unused.

##########
File path: sdks/python/apache_beam/runners/portability/artifact_service.py
##########
@@ -77,8 +79,10 @@ def GetArtifact(self, request, context=None):
     elif request.artifact.type_urn == common_urns.artifact_types.URL.urn:
       payload = proto_utils.parse_Bytes(
           request.artifact.type_payload, beam_runner_api_pb2.ArtifactUrlPayload)
-      # TODO(Py3): Remove the unneeded contextlib wrapper.
-      read_handle = contextlib.closing(urlopen(payload.url))
+      if FileSystems.get_scheme(payload.url) == GCSFileSystem.scheme():

Review comment:
       Can we generalize this to any Beam filesystem (perhaps using `FileSystems.open`)? https://beam.apache.org/releases/pydoc/2.30.0/apache_beam.io.filesystems.html#apache_beam.io.filesystems.FileSystems.open

##########
File path: sdks/python/apache_beam/runners/portability/artifact_service_test.py
##########
@@ -122,6 +124,24 @@ def test_url_retrieval(self):
     with open(__file__, 'rb') as fin:
       self.assertEqual(content, fin.read())
 
+  def test_gcs_retrieval(self):
+    retrieval_service = artifact_service.ArtifactRetrievalService(None)
+    url_dep = beam_runner_api_pb2.ArtifactInformation(
+        type_urn=common_urns.artifact_types.URL.urn,
+        type_payload=beam_runner_api_pb2.ArtifactUrlPayload(
+            url='gs://test_gcs_retrieval').SerializeToString())

Review comment:
       Nit: though it must not matter here, `gs://test_gcs_retrieval` is a bucket, not a valid GCS object path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org