You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2023/10/24 23:54:23 UTC
[beam] branch master updated: [PYTHON] Add new `--auto_unique_labels` option to StandardOptions (#28984)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 48722f1bfb4 [PYTHON] Add new `--auto_unique_labels` option to StandardOptions (#28984)
48722f1bfb4 is described below
commit 48722f1bfb4b0e81d29c8ba595944a3828af877f
Author: Hai Joey Tran <jo...@schrodinger.com>
AuthorDate: Tue Oct 24 19:54:16 2023 -0400
[PYTHON] Add new `--auto_unique_labels` option to StandardOptions (#28984)
---
.../python/apache_beam/options/pipeline_options.py | 8 ++++++
sdks/python/apache_beam/pipeline.py | 33 ++++++++++++++++++----
sdks/python/apache_beam/pipeline_test.py | 27 ++++++++++++++++++
3 files changed, 62 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 3fbf7eff7dd..76b776779cf 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -515,6 +515,14 @@ class StandardOptions(PipelineOptions):
'at transform level. Interpretation of hints is defined by '
'Beam runners.'))
+ parser.add_argument(
+ '--auto_unique_labels',
+ default=False,
+ action='store_true',
+ help='Whether to automatically generate unique transform labels '
+ 'for every transform. The default behavior is to raise an '
+ 'exception if a transform is created with a non-unique label.')
+
class CrossLanguageOptions(PipelineOptions):
@classmethod
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index f52616307e7..ed0736250d1 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -54,6 +54,7 @@ import re
import shutil
import tempfile
import unicodedata
+import uuid
from collections import defaultdict
from typing import TYPE_CHECKING
from typing import Any
@@ -681,13 +682,20 @@ class Pipeline(HasDisplayData):
alter_label_if_ipython(transform, pvalueish)
full_label = '/'.join(
- [self._current_transform().full_label, label or
- transform.label]).lstrip('/')
+ [self._current_transform().full_label, transform.label]).lstrip('/')
if full_label in self.applied_labels:
- raise RuntimeError(
- 'A transform with label "%s" already exists in the pipeline. '
- 'To apply a transform with a specified label write '
- 'pvalue | "label" >> transform' % full_label)
+ auto_unique_labels = self._options.view_as(
+ StandardOptions).auto_unique_labels
+ if auto_unique_labels:
+ # If auto_unique_labels is set, we will append a unique suffix to the
+ # label to make it unique.
+ unique_label = self._generate_unique_label(transform)
+ return self.apply(transform, pvalueish, unique_label)
+ else:
+ raise RuntimeError(
+ 'A transform with label "%s" already exists in the pipeline. '
+ 'To apply a transform with a specified label write '
+ 'pvalue | "label" >> transform' % full_label)
self.applied_labels.add(full_label)
pvalueish, inputs = transform._extract_input_pvalues(pvalueish)
@@ -763,6 +771,19 @@ class Pipeline(HasDisplayData):
self.transforms_stack.pop()
return pvalueish_result
+ def _generate_unique_label(
+ self,
+ transform # type: str
+ ):
+ # type: (...) -> str
+
+ """
+ Given a transform, generate a unique label for it based on current label.
+ """
+ unique_suffix = uuid.uuid4().hex[:6]
+ return '%s_%s' % (transform.label, unique_suffix)
+
+
def _infer_result_type(
self,
transform, # type: ptransform.PTransform
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index c9ac4ce4c13..113d1a99990 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -22,6 +22,7 @@
import copy
import platform
import unittest
+import uuid
import mock
import pytest
@@ -266,6 +267,32 @@ class PipelineTest(unittest.TestCase):
'pipeline. To apply a transform with a specified label write '
'pvalue | "label" >> transform')
+ def test_auto_unique_labels(self):
+
+ opts = PipelineOptions(["--auto_unique_labels"])
+ with mock.patch.object(uuid, 'uuid4') as mock_uuid_gen:
+ mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')]
+ mock_uuid_gen.side_effect = mock_uuids
+ with TestPipeline(options=opts) as pipeline:
+ pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
+
+ def identity(x):
+ return x
+
+ pcoll2 = pcoll | Map(identity)
+ pcoll3 = pcoll2 | Map(identity)
+ pcoll4 = pcoll3 | Map(identity)
+ assert_that(pcoll4, equal_to([1, 2, 3]))
+
+ map_id_full_labels = {
+ label
+ for label in pipeline.applied_labels if "Map(identity)" in label
+ }
+ map_id_leaf_labels = {label.split(":")[-1] for label in map_id_full_labels}
+ # Only the first 6 chars of the UUID hex should be used
+ assert map_id_leaf_labels == set(
+ ["Map(identity)", "Map(identity)_UUID01", "Map(identity)_UUID02"])
+
def test_reuse_cloned_custom_transform_instance(self):
with TestPipeline() as pipeline:
pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3])