You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/10/30 18:53:26 UTC
[beam] branch master updated: [BEAM-5617] Use bytes consistently
for pcollection ids. (#6844)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f41b7e4 [BEAM-5617] Use bytes consistently for pcollection ids. (#6844)
f41b7e4 is described below
commit f41b7e4671fb443d4e076a2fd3b3bfe09afb4925
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Tue Oct 30 19:53:18 2018 +0100
[BEAM-5617] Use bytes consistently for pcollection ids. (#6844)
* [BEAM-5617] Use bytes consistently for pcollection ids.
---
.../runners/portability/fn_api_runner.py | 77 ++++++++++++----------
1 file changed, 42 insertions(+), 35 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 6741408..832cba9 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -62,7 +62,7 @@ ENCODED_IMPULSE_VALUE = beam.coders.WindowedValueCoder(
beam.coders.coders.GlobalWindowCoder()).get_impl().encode_nested(
beam.transforms.window.GlobalWindows.windowed_value(b''))
-IMPULSE_BUFFER_PREFIX = b'impulse:'
+IMPULSE_BUFFER = b'impulse'
class BeamFnControlServicer(beam_fn_api_pb2_grpc.BeamFnControlServicer):
@@ -458,14 +458,12 @@ class FnApiRunner(runner.PipelineRunner):
for transform in list(stage.transforms):
if transform.spec.urn == common_urns.primitives.IMPULSE.urn:
stage.transforms.remove(transform)
- impulse_pc = only_element(transform.outputs.values())
stage.transforms.append(
beam_runner_api_pb2.PTransform(
unique_name=transform.unique_name,
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- payload=IMPULSE_BUFFER_PREFIX +
- impulse_pc.encode('utf-8')),
+ payload=IMPULSE_BUFFER),
outputs=transform.outputs))
yield stage
@@ -618,7 +616,7 @@ class FnApiRunner(runner.PipelineRunner):
pipeline_components.pcollections[pcoll_id], pipeline_components)
# This is used later to correlate the read and write.
- param = str("group:%s" % stage.name).encode('utf-8')
+ grouping_buffer = create_buffer_id(stage.name, kind='group')
if stage.name not in pipeline_components.transforms:
pipeline_components.transforms[stage.name].CopyFrom(transform)
gbk_write = Stage(
@@ -628,7 +626,7 @@ class FnApiRunner(runner.PipelineRunner):
inputs=transform.inputs,
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_OUTPUT_URN,
- payload=param))],
+ payload=grouping_buffer))],
downstream_side_inputs=frozenset(),
must_follow=stage.must_follow)
yield gbk_write
@@ -640,7 +638,7 @@ class FnApiRunner(runner.PipelineRunner):
outputs=transform.outputs,
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- payload=param))],
+ payload=grouping_buffer))],
downstream_side_inputs=stage.downstream_side_inputs,
must_follow=union(frozenset([gbk_write]), stage.must_follow))
else:
@@ -660,7 +658,7 @@ class FnApiRunner(runner.PipelineRunner):
transform = stage.transforms[0]
if transform.spec.urn == common_urns.primitives.FLATTEN.urn:
# This is used later to correlate the read and writes.
- param = str("materialize:%s" % transform.unique_name).encode('utf-8')
+ buffer_id = create_buffer_id(transform.unique_name)
output_pcoll_id, = list(transform.outputs.values())
output_coder_id = pcollections[output_pcoll_id].coder_id
flatten_writes = []
@@ -696,7 +694,7 @@ class FnApiRunner(runner.PipelineRunner):
inputs={local_in: transcoded_pcollection},
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_OUTPUT_URN,
- payload=param))],
+ payload=buffer_id))],
downstream_side_inputs=frozenset(),
must_follow=stage.must_follow)
flatten_writes.append(flatten_write)
@@ -709,7 +707,7 @@ class FnApiRunner(runner.PipelineRunner):
outputs=transform.outputs,
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- payload=param))],
+ payload=buffer_id))],
downstream_side_inputs=stage.downstream_side_inputs,
must_follow=union(frozenset(flatten_writes), stage.must_follow))
@@ -808,7 +806,6 @@ class FnApiRunner(runner.PipelineRunner):
# Now try to fuse away all pcollections.
for pcoll, producer in producers_by_pcoll.items():
- pcoll_as_param = str("materialize:%s" % pcoll).encode('utf-8')
write_pcoll = None
for consumer in consumers_by_pcoll[pcoll]:
producer = replacement(producer)
@@ -820,6 +817,7 @@ class FnApiRunner(runner.PipelineRunner):
fuse(producer, consumer)
else:
# If we can't fuse, do a read + write.
+ buffer_id = create_buffer_id(pcoll)
if write_pcoll is None:
write_pcoll = Stage(
pcoll + '/Write',
@@ -828,7 +826,7 @@ class FnApiRunner(runner.PipelineRunner):
inputs={'in': pcoll},
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_OUTPUT_URN,
- payload=pcoll_as_param))])
+ payload=buffer_id))])
fuse(producer, write_pcoll)
if consumer.has_as_main_input(pcoll):
read_pcoll = Stage(
@@ -838,7 +836,7 @@ class FnApiRunner(runner.PipelineRunner):
outputs={'out': pcoll},
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- payload=pcoll_as_param))],
+ payload=buffer_id))],
must_follow=frozenset([write_pcoll]))
fuse(read_pcoll, consumer)
else:
@@ -922,16 +920,16 @@ class FnApiRunner(runner.PipelineRunner):
outputs={'out': timer_read_pcoll},
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- payload=('timers:%s' % timer_read_pcoll).encode(
- 'utf-8'))))
+ payload=create_buffer_id(
+ timer_read_pcoll, kind='timers'))))
stage.transforms.append(
beam_runner_api_pb2.PTransform(
unique_name=timer_write_pcoll + '/Write',
inputs={'in': timer_write_pcoll},
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_OUTPUT_URN,
- payload=('timers:%s' % timer_write_pcoll).encode(
- 'utf-8'))))
+ payload=create_buffer_id(
+ timer_write_pcoll, kind='timers'))))
assert tag not in transform.inputs
transform.inputs[tag] = timer_read_pcoll
assert tag not in transform.outputs
@@ -1044,7 +1042,7 @@ class FnApiRunner(runner.PipelineRunner):
pcoll_id = transform.spec.payload
if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
target = transform.unique_name, only_element(transform.outputs)
- if pcoll_id.startswith(IMPULSE_BUFFER_PREFIX):
+ if pcoll_id == IMPULSE_BUFFER:
data_input[target] = [ENCODED_IMPULSE_VALUE]
else:
data_input[target] = pcoll_buffers[pcoll_id]
@@ -1067,7 +1065,7 @@ class FnApiRunner(runner.PipelineRunner):
transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
for tag, si in payload.side_inputs.items():
data_side_input[transform.unique_name, tag] = (
- 'materialize:' + transform.inputs[tag],
+ create_buffer_id(transform.inputs[tag]),
beam.pvalue.SideInputData.from_runner_api(si, context))
return data_input, data_side_input, data_output
@@ -1090,12 +1088,12 @@ class FnApiRunner(runner.PipelineRunner):
controller.state_api_service_descriptor().url)
# Store the required side inputs into state.
- for (transform_id, tag), (pcoll_id, si) in data_side_input.items():
- actual_pcoll_id = pcoll_id[len(b"materialize:"):]
+ for (transform_id, tag), (buffer_id, si) in data_side_input.items():
+ _, pcoll_id = split_buffer_id(buffer_id)
value_coder = context.coders[safe_coders[
- pipeline_components.pcollections[actual_pcoll_id].coder_id]]
+ pipeline_components.pcollections[pcoll_id].coder_id]]
elements_by_window = _WindowGroupingBuffer(si, value_coder)
- for element_data in pcoll_buffers[pcoll_id]:
+ for element_data in pcoll_buffers[buffer_id]:
elements_by_window.append(element_data)
for key, window, elements_data in elements_by_window.encoded_items():
state_key = beam_fn_api_pb2.StateKey(
@@ -1106,16 +1104,16 @@ class FnApiRunner(runner.PipelineRunner):
key=key))
controller.state_handler.blocking_append(state_key, elements_data)
- def get_buffer(pcoll_id):
- if (pcoll_id.startswith(b'materialize:')
- or pcoll_id.startswith(b'timers:')):
- if pcoll_id not in pcoll_buffers:
+ def get_buffer(buffer_id):
+ kind, name = split_buffer_id(buffer_id)
+ if kind in ('materialize', 'timers'):
+ if buffer_id not in pcoll_buffers:
# Just store the data chunks for replay.
- pcoll_buffers[pcoll_id] = list()
- elif pcoll_id.startswith(b'group:'):
+ pcoll_buffers[buffer_id] = list()
+ elif kind == 'group':
# This is a grouping write, create a grouping buffer if needed.
- if pcoll_id not in pcoll_buffers:
- original_gbk_transform = pcoll_id.split(b':', 1)[1]
+ if buffer_id not in pcoll_buffers:
+ original_gbk_transform = name
transform_proto = pipeline_components.transforms[
original_gbk_transform]
input_pcoll = only_element(list(transform_proto.inputs.values()))
@@ -1127,13 +1125,13 @@ class FnApiRunner(runner.PipelineRunner):
windowing_strategy = context.windowing_strategies[
pipeline_components
.pcollections[output_pcoll].windowing_strategy_id]
- pcoll_buffers[pcoll_id] = _GroupingBuffer(
+ pcoll_buffers[buffer_id] = _GroupingBuffer(
pre_gbk_coder, post_gbk_coder, windowing_strategy)
else:
# These should be the only two identifiers we produce for now,
# but special side input writes may go here.
- raise NotImplementedError(pcoll_id)
- return pcoll_buffers[pcoll_id]
+ raise NotImplementedError(buffer_id)
+ return pcoll_buffers[buffer_id]
for k in range(self._bundle_repeat):
try:
@@ -1153,7 +1151,8 @@ class FnApiRunner(runner.PipelineRunner):
for transform_id, timer_writes in stage.timer_pcollections:
windowed_timer_coder_impl = context.coders[
pipeline_components.pcollections[timer_writes].coder_id].get_impl()
- written_timers = get_buffer(b'timers:' + timer_writes.encode('utf-8'))
+ written_timers = get_buffer(
+ create_buffer_id(timer_writes, kind='timers'))
if written_timers:
# Keep only the "last" timer set per key and window.
timers_by_key_and_window = {}
@@ -1636,3 +1635,11 @@ def unique_name(existing, prefix):
return prefix_counter
else:
return prefix
+
+
+def create_buffer_id(name, kind='materialize'):
+ return ('%s:%s' % (kind, name)).encode('utf-8')
+
+
+def split_buffer_id(buffer_id):
+ return buffer_id.decode('utf-8').split(':', 1)