You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/10/30 18:53:26 UTC

[beam] branch master updated: [BEAM-5617] Use bytes consistently for pcollection ids. (#6844)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f41b7e4  [BEAM-5617] Use bytes consistently for pcollection ids. (#6844)
f41b7e4 is described below

commit f41b7e4671fb443d4e076a2fd3b3bfe09afb4925
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Tue Oct 30 19:53:18 2018 +0100

    [BEAM-5617] Use bytes consistently for pcollection ids. (#6844)
    
    * [BEAM-5617] Use bytes consistently for pcollection ids.
---
 .../runners/portability/fn_api_runner.py           | 77 ++++++++++++----------
 1 file changed, 42 insertions(+), 35 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 6741408..832cba9 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -62,7 +62,7 @@ ENCODED_IMPULSE_VALUE = beam.coders.WindowedValueCoder(
     beam.coders.coders.GlobalWindowCoder()).get_impl().encode_nested(
         beam.transforms.window.GlobalWindows.windowed_value(b''))
 
-IMPULSE_BUFFER_PREFIX = b'impulse:'
+IMPULSE_BUFFER = b'impulse'
 
 
 class BeamFnControlServicer(beam_fn_api_pb2_grpc.BeamFnControlServicer):
@@ -458,14 +458,12 @@ class FnApiRunner(runner.PipelineRunner):
         for transform in list(stage.transforms):
           if transform.spec.urn == common_urns.primitives.IMPULSE.urn:
             stage.transforms.remove(transform)
-            impulse_pc = only_element(transform.outputs.values())
             stage.transforms.append(
                 beam_runner_api_pb2.PTransform(
                     unique_name=transform.unique_name,
                     spec=beam_runner_api_pb2.FunctionSpec(
                         urn=bundle_processor.DATA_INPUT_URN,
-                        payload=IMPULSE_BUFFER_PREFIX +
-                        impulse_pc.encode('utf-8')),
+                        payload=IMPULSE_BUFFER),
                     outputs=transform.outputs))
 
         yield stage
@@ -618,7 +616,7 @@ class FnApiRunner(runner.PipelineRunner):
                 pipeline_components.pcollections[pcoll_id], pipeline_components)
 
           # This is used later to correlate the read and write.
-          param = str("group:%s" % stage.name).encode('utf-8')
+          grouping_buffer = create_buffer_id(stage.name, kind='group')
           if stage.name not in pipeline_components.transforms:
             pipeline_components.transforms[stage.name].CopyFrom(transform)
           gbk_write = Stage(
@@ -628,7 +626,7 @@ class FnApiRunner(runner.PipelineRunner):
                   inputs=transform.inputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_OUTPUT_URN,
-                      payload=param))],
+                      payload=grouping_buffer))],
               downstream_side_inputs=frozenset(),
               must_follow=stage.must_follow)
           yield gbk_write
@@ -640,7 +638,7 @@ class FnApiRunner(runner.PipelineRunner):
                   outputs=transform.outputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_INPUT_URN,
-                      payload=param))],
+                      payload=grouping_buffer))],
               downstream_side_inputs=stage.downstream_side_inputs,
               must_follow=union(frozenset([gbk_write]), stage.must_follow))
         else:
@@ -660,7 +658,7 @@ class FnApiRunner(runner.PipelineRunner):
         transform = stage.transforms[0]
         if transform.spec.urn == common_urns.primitives.FLATTEN.urn:
           # This is used later to correlate the read and writes.
-          param = str("materialize:%s" % transform.unique_name).encode('utf-8')
+          buffer_id = create_buffer_id(transform.unique_name)
           output_pcoll_id, = list(transform.outputs.values())
           output_coder_id = pcollections[output_pcoll_id].coder_id
           flatten_writes = []
@@ -696,7 +694,7 @@ class FnApiRunner(runner.PipelineRunner):
                     inputs={local_in: transcoded_pcollection},
                     spec=beam_runner_api_pb2.FunctionSpec(
                         urn=bundle_processor.DATA_OUTPUT_URN,
-                        payload=param))],
+                        payload=buffer_id))],
                 downstream_side_inputs=frozenset(),
                 must_follow=stage.must_follow)
             flatten_writes.append(flatten_write)
@@ -709,7 +707,7 @@ class FnApiRunner(runner.PipelineRunner):
                   outputs=transform.outputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_INPUT_URN,
-                      payload=param))],
+                      payload=buffer_id))],
               downstream_side_inputs=stage.downstream_side_inputs,
               must_follow=union(frozenset(flatten_writes), stage.must_follow))
 
@@ -808,7 +806,6 @@ class FnApiRunner(runner.PipelineRunner):
 
       # Now try to fuse away all pcollections.
       for pcoll, producer in producers_by_pcoll.items():
-        pcoll_as_param = str("materialize:%s" % pcoll).encode('utf-8')
         write_pcoll = None
         for consumer in consumers_by_pcoll[pcoll]:
           producer = replacement(producer)
@@ -820,6 +817,7 @@ class FnApiRunner(runner.PipelineRunner):
             fuse(producer, consumer)
           else:
             # If we can't fuse, do a read + write.
+            buffer_id = create_buffer_id(pcoll)
             if write_pcoll is None:
               write_pcoll = Stage(
                   pcoll + '/Write',
@@ -828,7 +826,7 @@ class FnApiRunner(runner.PipelineRunner):
                       inputs={'in': pcoll},
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_OUTPUT_URN,
-                          payload=pcoll_as_param))])
+                          payload=buffer_id))])
               fuse(producer, write_pcoll)
             if consumer.has_as_main_input(pcoll):
               read_pcoll = Stage(
@@ -838,7 +836,7 @@ class FnApiRunner(runner.PipelineRunner):
                       outputs={'out': pcoll},
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_INPUT_URN,
-                          payload=pcoll_as_param))],
+                          payload=buffer_id))],
                   must_follow=frozenset([write_pcoll]))
               fuse(read_pcoll, consumer)
             else:
@@ -922,16 +920,16 @@ class FnApiRunner(runner.PipelineRunner):
                       outputs={'out': timer_read_pcoll},
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_INPUT_URN,
-                          payload=('timers:%s' % timer_read_pcoll).encode(
-                              'utf-8'))))
+                          payload=create_buffer_id(
+                              timer_read_pcoll, kind='timers'))))
               stage.transforms.append(
                   beam_runner_api_pb2.PTransform(
                       unique_name=timer_write_pcoll + '/Write',
                       inputs={'in': timer_write_pcoll},
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_OUTPUT_URN,
-                          payload=('timers:%s' % timer_write_pcoll).encode(
-                              'utf-8'))))
+                          payload=create_buffer_id(
+                              timer_write_pcoll, kind='timers'))))
               assert tag not in transform.inputs
               transform.inputs[tag] = timer_read_pcoll
               assert tag not in transform.outputs
@@ -1044,7 +1042,7 @@ class FnApiRunner(runner.PipelineRunner):
           pcoll_id = transform.spec.payload
           if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
             target = transform.unique_name, only_element(transform.outputs)
-            if pcoll_id.startswith(IMPULSE_BUFFER_PREFIX):
+            if pcoll_id == IMPULSE_BUFFER:
               data_input[target] = [ENCODED_IMPULSE_VALUE]
             else:
               data_input[target] = pcoll_buffers[pcoll_id]
@@ -1067,7 +1065,7 @@ class FnApiRunner(runner.PipelineRunner):
               transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
           for tag, si in payload.side_inputs.items():
             data_side_input[transform.unique_name, tag] = (
-                'materialize:' + transform.inputs[tag],
+                create_buffer_id(transform.inputs[tag]),
                 beam.pvalue.SideInputData.from_runner_api(si, context))
       return data_input, data_side_input, data_output
 
@@ -1090,12 +1088,12 @@ class FnApiRunner(runner.PipelineRunner):
           controller.state_api_service_descriptor().url)
 
     # Store the required side inputs into state.
-    for (transform_id, tag), (pcoll_id, si) in data_side_input.items():
-      actual_pcoll_id = pcoll_id[len(b"materialize:"):]
+    for (transform_id, tag), (buffer_id, si) in data_side_input.items():
+      _, pcoll_id = split_buffer_id(buffer_id)
       value_coder = context.coders[safe_coders[
-          pipeline_components.pcollections[actual_pcoll_id].coder_id]]
+          pipeline_components.pcollections[pcoll_id].coder_id]]
       elements_by_window = _WindowGroupingBuffer(si, value_coder)
-      for element_data in pcoll_buffers[pcoll_id]:
+      for element_data in pcoll_buffers[buffer_id]:
         elements_by_window.append(element_data)
       for key, window, elements_data in elements_by_window.encoded_items():
         state_key = beam_fn_api_pb2.StateKey(
@@ -1106,16 +1104,16 @@ class FnApiRunner(runner.PipelineRunner):
                 key=key))
         controller.state_handler.blocking_append(state_key, elements_data)
 
-    def get_buffer(pcoll_id):
-      if (pcoll_id.startswith(b'materialize:')
-          or pcoll_id.startswith(b'timers:')):
-        if pcoll_id not in pcoll_buffers:
+    def get_buffer(buffer_id):
+      kind, name = split_buffer_id(buffer_id)
+      if kind in ('materialize', 'timers'):
+        if buffer_id not in pcoll_buffers:
           # Just store the data chunks for replay.
-          pcoll_buffers[pcoll_id] = list()
-      elif pcoll_id.startswith(b'group:'):
+          pcoll_buffers[buffer_id] = list()
+      elif kind == 'group':
         # This is a grouping write, create a grouping buffer if needed.
-        if pcoll_id not in pcoll_buffers:
-          original_gbk_transform = pcoll_id.split(b':', 1)[1]
+        if buffer_id not in pcoll_buffers:
+          original_gbk_transform = name
           transform_proto = pipeline_components.transforms[
               original_gbk_transform]
           input_pcoll = only_element(list(transform_proto.inputs.values()))
@@ -1127,13 +1125,13 @@ class FnApiRunner(runner.PipelineRunner):
           windowing_strategy = context.windowing_strategies[
               pipeline_components
               .pcollections[output_pcoll].windowing_strategy_id]
-          pcoll_buffers[pcoll_id] = _GroupingBuffer(
+          pcoll_buffers[buffer_id] = _GroupingBuffer(
               pre_gbk_coder, post_gbk_coder, windowing_strategy)
       else:
         # These should be the only two identifiers we produce for now,
         # but special side input writes may go here.
-        raise NotImplementedError(pcoll_id)
-      return pcoll_buffers[pcoll_id]
+        raise NotImplementedError(buffer_id)
+      return pcoll_buffers[buffer_id]
 
     for k in range(self._bundle_repeat):
       try:
@@ -1153,7 +1151,8 @@ class FnApiRunner(runner.PipelineRunner):
       for transform_id, timer_writes in stage.timer_pcollections:
         windowed_timer_coder_impl = context.coders[
             pipeline_components.pcollections[timer_writes].coder_id].get_impl()
-        written_timers = get_buffer(b'timers:' + timer_writes.encode('utf-8'))
+        written_timers = get_buffer(
+            create_buffer_id(timer_writes, kind='timers'))
         if written_timers:
           # Keep only the "last" timer set per key and window.
           timers_by_key_and_window = {}
@@ -1636,3 +1635,11 @@ def unique_name(existing, prefix):
         return prefix_counter
   else:
     return prefix
+
+
+def create_buffer_id(name, kind='materialize'):
+  return ('%s:%s' % (kind, name)).encode('utf-8')
+
+
+def split_buffer_id(buffer_id):
+  return buffer_id.decode('utf-8').split(':', 1)