You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/05/28 18:05:27 UTC

[beam] branch release-2.22.0 updated: [BEAM-10052] check hash and avoid duplicated artifacts (#11843)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch release-2.22.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.22.0 by this push:
     new ed016e3  [BEAM-10052] check hash and avoid duplicated artifacts (#11843)
ed016e3 is described below

commit ed016e3ebf4abb1a26367de1507501a5bf07c902
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Thu May 28 11:05:09 2020 -0700

    [BEAM-10052] check hash and avoid duplicated artifacts (#11843)
    
    Co-authored-by: Heejong Lee <he...@gmail.com>
---
 .../apache_beam/runners/dataflow/internal/apiclient.py       | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 06bb260..55649a7 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -583,6 +583,7 @@ class DataflowApplicationClient(object):
       raise RuntimeError('The --temp_location option must be specified.')
 
     resources = []
+    hashs = {}
     for _, env in sorted(pipeline.components.environments.items(),
                          key=lambda kv: kv[0]):
       for dep in env.dependencies:
@@ -595,7 +596,16 @@ class DataflowApplicationClient(object):
         role_payload = (
             beam_runner_api_pb2.ArtifactStagingToRolePayload.FromString(
                 dep.role_payload))
-        resources.append((type_payload.path, role_payload.staged_name))
+        if type_payload.sha256 and type_payload.sha256 in hashs:
+          _LOGGER.info(
+              'Found duplicated artifact: %s (%s)',
+              type_payload.path,
+              type_payload.sha256)
+          dep.role_payload = beam_runner_api_pb2.ArtifactStagingToRolePayload(
+              staged_name=hashs[type_payload.sha256]).SerializeToString()
+        else:
+          resources.append((type_payload.path, role_payload.staged_name))
+          hashs[type_payload.sha256] = role_payload.staged_name
 
     resource_stager = _LegacyDataflowStager(self)
     staged_resources = resource_stager.stage_job_resources(