You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/26 23:41:34 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #12067: [BEAM-10308] Make component ID assignments consistent across PipelineContext instances

TheNeuralBit commented on a change in pull request #12067:
URL: https://github.com/apache/beam/pull/12067#discussion_r446452942



##########
File path: sdks/python/apache_beam/runners/pipeline_context.py
##########
@@ -49,6 +50,52 @@
   from apache_beam.coders.coder_impl import IterableStateWriter
 
 
+class _UniqueRefAssigner(object):
+  """Utility for assigning unique refs to proto messages for use in components.
+
+  Instances of _UniqueRefAssigner are global (scoped by the base string). That
+  way once a unique ref is assigned it will be used consistently across
+  PipelineContext instances.
+  """
+  _INSTANCES = {}  # type: Dict[Tuple[Pipeline, str], _UniqueRefAssigner]
+
+  def __init__(self, base):
+    self._base = base
+    self._counter = 0
+    self._obj_to_id = {}  # type: Dict[Any, str]
+
+  def get_or_assign(self, obj=None, label=None):
+    # type: (Optional[Any], Optional[str]) -> str
+
+    """Retrieve the unique ref for the given object.
+
+    Generates and assigns a unique ref if one hasn't been assigned yet. label
+    will be incorporated into the unique ref when assigning a new unique ref,
+    otherwise it is ignored."""
+    if obj not in self._obj_to_id:
+      self._obj_to_id[obj] = self._unique_ref(obj, label)
+
+    return self._obj_to_id[obj]
+
+  def _unique_ref(self, obj=None, label=None):
+    self._counter += 1
+    return "%s_%s_%d" % (self._base, label or type(obj).__name__, self._counter)
+
+  @classmethod
+  def get_instance(cls, pipeline, base):
+    # type: (Optional[Pipeline], str) -> _UniqueRefAssigner
+
+    """Return the _UniqueRefAssigner with the given base string.
+
+    Creates a new instance if one doesn't already exist for this base string."""
+    key = (id(pipeline), base)

Review comment:
       Partitioning the cached ID assignments by `id(pipeline)` is a sub-par and dangerous solution because `id` is only guaranteed to be unique among objects that have non-overlapping lifetimes. If we go with something like this approach, we need to at least tie these instances' lifetimes to the lifetime of the pipeline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org