You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/19 23:44:30 UTC

[GitHub] [beam] chamikaramj commented on a diff in pull request #22308: Deduplicate identical environments in a pipeline.

chamikaramj commented on code in PR #22308:
URL: https://github.com/apache/beam/pull/22308#discussion_r925042609


##########
sdks/python/apache_beam/pipeline.py:
##########
@@ -919,11 +918,49 @@ def visit_transform(self, transform_node):
         requirements=context.requirements())
     proto.components.transforms[root_transform_id].unique_name = (
         root_transform_id)
+    self.merge_compatible_environments(proto)
     if return_context:
       return proto, context  # type: ignore  # too complicated for now
     else:
       return proto
 
+  @staticmethod
+  def merge_compatible_environments(proto):
+    """Tries to minimize the number of distinct environments by merging
+    those that are compatible (currently defined as identical).
+
+    Mutates proto as contexts may have references to proto.components.
+    """
+    env_map = {}
+    canonical_env = {}
+    files_by_hash = {}
+    for env_id, env in proto.components.environments.items():
+      # First deduplicate any file dependencies by their hash.
+      for dep in env.dependencies:
+        if dep.type_urn == common_urns.artifact_types.FILE.urn:
+          file_payload = beam_runner_api_pb2.ArtifactFilePayload.FromString(
+              dep.type_payload)
+          if file_payload.sha256:
+            if file_payload.sha256 in files_by_hash:
+              file_payload.path = files_by_hash[file_payload.sha256]

Review Comment:
   FYI, we de-duplicate artifacts again before staging: https://github.com/apache/beam/blob/ba62704a4640979763655a25fd1d603e60bc8e80/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L663



##########
sdks/python/apache_beam/pipeline.py:
##########
@@ -919,11 +918,49 @@ def visit_transform(self, transform_node):
         requirements=context.requirements())
     proto.components.transforms[root_transform_id].unique_name = (
         root_transform_id)
+    self.merge_compatible_environments(proto)
     if return_context:
       return proto, context  # type: ignore  # too complicated for now
     else:
       return proto
 
+  @staticmethod
+  def merge_compatible_environments(proto):
+    """Tries to minimize the number of distinct environments by merging
+    those that are compatible (currently defined as identical).
+
+    Mutates proto as contexts may have references to proto.components.
+    """
+    env_map = {}
+    canonical_env = {}
+    files_by_hash = {}
+    for env_id, env in proto.components.environments.items():
+      # First deduplicate any file dependencies by their hash.
+      for dep in env.dependencies:

Review Comment:
   Probably we should sort dependencies before de-duping ?
   Otherwise different environments with the same normalized set of files might end up picking different files based on the order.



##########
sdks/python/apache_beam/pipeline_test.py:
##########
@@ -1291,6 +1291,56 @@ def CompositeTransform(pcoll):
         count += 1
     assert count == 2
 
+  def test_environments_are_deduplicated(self):
+    def file_artifact(path, hash, staged_name):
+      return beam_runner_api_pb2.ArtifactInformation(
+          type_urn=common_urns.artifact_types.FILE.urn,
+          type_payload=beam_runner_api_pb2.ArtifactFilePayload(
+              path=path, sha256=hash).SerializeToString(),
+          role_urn=common_urns.artifact_roles.STAGING_TO.urn,
+          role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload(
+              staged_name=staged_name).SerializeToString(),
+      )
+
+    proto = beam_runner_api_pb2.Pipeline(
+        components=beam_runner_api_pb2.Components(
+            transforms={
+                'transform1': beam_runner_api_pb2.PTransform(
+                    environment_id='e1'),
+                'transform2': beam_runner_api_pb2.PTransform(
+                    environment_id='e2'),
+                'transform3': beam_runner_api_pb2.PTransform(
+                    environment_id='e3'),
+                'transform4': beam_runner_api_pb2.PTransform(
+                    environment_id='e4'),
+            },
+            environments={
+                'e1': beam_runner_api_pb2.Environment(
+                    dependencies=[file_artifact('a1', 'x', 'a')]),
+                'e2': beam_runner_api_pb2.Environment(
+                    dependencies=[file_artifact('a2', 'x', 'a')]),
+                'e3': beam_runner_api_pb2.Environment(
+                    dependencies=[file_artifact('a3', 'y', 'a')]),

Review Comment:
   Probably also should add a negative test for the case where artifacts are the same but  other properties (for example, capabilities) are different.



##########
sdks/python/apache_beam/pipeline_test.py:
##########
@@ -1291,6 +1291,56 @@ def CompositeTransform(pcoll):
         count += 1
     assert count == 2
 
+  def test_environments_are_deduplicated(self):
+    def file_artifact(path, hash, staged_name):
+      return beam_runner_api_pb2.ArtifactInformation(
+          type_urn=common_urns.artifact_types.FILE.urn,
+          type_payload=beam_runner_api_pb2.ArtifactFilePayload(
+              path=path, sha256=hash).SerializeToString(),
+          role_urn=common_urns.artifact_roles.STAGING_TO.urn,
+          role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload(
+              staged_name=staged_name).SerializeToString(),
+      )
+
+    proto = beam_runner_api_pb2.Pipeline(
+        components=beam_runner_api_pb2.Components(
+            transforms={
+                'transform1': beam_runner_api_pb2.PTransform(
+                    environment_id='e1'),
+                'transform2': beam_runner_api_pb2.PTransform(
+                    environment_id='e2'),
+                'transform3': beam_runner_api_pb2.PTransform(
+                    environment_id='e3'),
+                'transform4': beam_runner_api_pb2.PTransform(
+                    environment_id='e4'),
+            },
+            environments={
+                'e1': beam_runner_api_pb2.Environment(
+                    dependencies=[file_artifact('a1', 'x', 'a')]),

Review Comment:
   Add a test for multiple-artifacts ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org