You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2023/10/24 23:54:23 UTC

[beam] branch master updated: [PYTHON] Add new `--auto_unique_labels` option to StandardOptions (#28984)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 48722f1bfb4 [PYTHON] Add new `--auto_unique_labels` option to StandardOptions  (#28984)
48722f1bfb4 is described below

commit 48722f1bfb4b0e81d29c8ba595944a3828af877f
Author: Hai Joey Tran <jo...@schrodinger.com>
AuthorDate: Tue Oct 24 19:54:16 2023 -0400

    [PYTHON] Add new `--auto_unique_labels` option to StandardOptions  (#28984)
---
 .../python/apache_beam/options/pipeline_options.py |  8 ++++++
 sdks/python/apache_beam/pipeline.py                | 33 ++++++++++++++++++----
 sdks/python/apache_beam/pipeline_test.py           | 27 ++++++++++++++++++
 3 files changed, 62 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 3fbf7eff7dd..76b776779cf 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -515,6 +515,14 @@ class StandardOptions(PipelineOptions):
             'at transform level. Interpretation of hints is defined by '
             'Beam runners.'))
 
+    parser.add_argument(
+        '--auto_unique_labels',
+        default=False,
+        action='store_true',
+        help='Whether to automatically generate unique transform labels '
+        'for every transform. The default behavior is to raise an '
+        'exception if a transform is created with a non-unique label.')
+
 
 class CrossLanguageOptions(PipelineOptions):
   @classmethod
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index f52616307e7..ed0736250d1 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -54,6 +54,7 @@ import re
 import shutil
 import tempfile
 import unicodedata
+import uuid
 from collections import defaultdict
 from typing import TYPE_CHECKING
 from typing import Any
@@ -681,13 +682,20 @@ class Pipeline(HasDisplayData):
       alter_label_if_ipython(transform, pvalueish)
 
     full_label = '/'.join(
-        [self._current_transform().full_label, label or
-         transform.label]).lstrip('/')
+        [self._current_transform().full_label, transform.label]).lstrip('/')
     if full_label in self.applied_labels:
-      raise RuntimeError(
-          'A transform with label "%s" already exists in the pipeline. '
-          'To apply a transform with a specified label write '
-          'pvalue | "label" >> transform' % full_label)
+      auto_unique_labels = self._options.view_as(
+          StandardOptions).auto_unique_labels
+      if auto_unique_labels:
+        # If auto_unique_labels is set, we will append a unique suffix to the
+        # label to make it unique.
+        unique_label = self._generate_unique_label(transform)
+        return self.apply(transform, pvalueish, unique_label)
+      else:
+        raise RuntimeError(
+            'A transform with label "%s" already exists in the pipeline. '
+            'To apply a transform with a specified label write '
+            'pvalue | "label" >> transform' % full_label)
     self.applied_labels.add(full_label)
 
     pvalueish, inputs = transform._extract_input_pvalues(pvalueish)
@@ -763,6 +771,19 @@ class Pipeline(HasDisplayData):
       self.transforms_stack.pop()
     return pvalueish_result
 
+  def _generate_unique_label(
+      self,
+      transform  # type: str
+  ):
+    # type: (...) -> str
+
+    """
+    Given a transform, generate a unique label for it based on current label.
+    """
+    unique_suffix = uuid.uuid4().hex[:6]
+    return '%s_%s' % (transform.label, unique_suffix)
+
+
   def _infer_result_type(
       self,
       transform,  # type: ptransform.PTransform
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index c9ac4ce4c13..113d1a99990 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -22,6 +22,7 @@
 import copy
 import platform
 import unittest
+import uuid
 
 import mock
 import pytest
@@ -266,6 +267,32 @@ class PipelineTest(unittest.TestCase):
         'pipeline. To apply a transform with a specified label write '
         'pvalue | "label" >> transform')
 
+  def test_auto_unique_labels(self):
+
+    opts = PipelineOptions(["--auto_unique_labels"])
+    with mock.patch.object(uuid, 'uuid4') as mock_uuid_gen:
+      mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')]
+      mock_uuid_gen.side_effect = mock_uuids
+      with TestPipeline(options=opts) as pipeline:
+        pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
+
+        def identity(x):
+          return x
+
+        pcoll2 = pcoll | Map(identity)
+        pcoll3 = pcoll2 | Map(identity)
+        pcoll4 = pcoll3 | Map(identity)
+        assert_that(pcoll4, equal_to([1, 2, 3]))
+
+    map_id_full_labels = {
+        label
+        for label in pipeline.applied_labels if "Map(identity)" in label
+    }
+    map_id_leaf_labels = {label.split(":")[-1] for label in map_id_full_labels}
+    # Only the first 6 chars of the UUID hex should be used
+    assert map_id_leaf_labels == set(
+        ["Map(identity)", "Map(identity)_UUID01", "Map(identity)_UUID02"])
+
   def test_reuse_cloned_custom_transform_instance(self):
     with TestPipeline() as pipeline:
       pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3])