You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/05/28 18:05:27 UTC
[beam] branch release-2.22.0 updated: [BEAM-10052] check hash and
avoid duplicated artifacts (#11843)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch release-2.22.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.22.0 by this push:
new ed016e3 [BEAM-10052] check hash and avoid duplicated artifacts (#11843)
ed016e3 is described below
commit ed016e3ebf4abb1a26367de1507501a5bf07c902
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Thu May 28 11:05:09 2020 -0700
[BEAM-10052] check hash and avoid duplicated artifacts (#11843)
Co-authored-by: Heejong Lee <he...@gmail.com>
---
.../apache_beam/runners/dataflow/internal/apiclient.py | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 06bb260..55649a7 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -583,6 +583,7 @@ class DataflowApplicationClient(object):
raise RuntimeError('The --temp_location option must be specified.')
resources = []
+ hashs = {}
for _, env in sorted(pipeline.components.environments.items(),
key=lambda kv: kv[0]):
for dep in env.dependencies:
@@ -595,7 +596,16 @@ class DataflowApplicationClient(object):
role_payload = (
beam_runner_api_pb2.ArtifactStagingToRolePayload.FromString(
dep.role_payload))
- resources.append((type_payload.path, role_payload.staged_name))
+ if type_payload.sha256 and type_payload.sha256 in hashs:
+ _LOGGER.info(
+ 'Found duplicated artifact: %s (%s)',
+ type_payload.path,
+ type_payload.sha256)
+ dep.role_payload = beam_runner_api_pb2.ArtifactStagingToRolePayload(
+ staged_name=hashs[type_payload.sha256]).SerializeToString()
+ else:
+ resources.append((type_payload.path, role_payload.staged_name))
+ hashs[type_payload.sha256] = role_payload.staged_name
resource_stager = _LegacyDataflowStager(self)
staged_resources = resource_stager.stage_job_resources(