You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/28 17:04:20 UTC
[1/2] beam git commit: Closes #3764
Repository: beam
Updated Branches:
refs/heads/master e6d5e0887 -> aef89dea8
Closes #3764
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aef89dea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aef89dea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aef89dea
Branch: refs/heads/master
Commit: aef89dea884319bc12885286a3a705e24794de48
Parents: e6d5e08 08a4487
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Aug 28 10:03:52 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Aug 28 10:03:52 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coders.py | 10 ++
.../runners/portability/fn_api_runner.py | 99 ++++++++++++++++++--
2 files changed, 100 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Wrap unknown coders in LengthPrefixCoder.
Posted by ro...@apache.org.
Wrap unknown coders in LengthPrefixCoder.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/08a44874
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/08a44874
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/08a44874
Branch: refs/heads/master
Commit: 08a448743e3b53e055d0ccf1983b5d128c5c0692
Parents: e6d5e08
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Aug 24 11:01:20 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Aug 28 10:03:52 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coders.py | 10 ++
.../runners/portability/fn_api_runner.py | 99 ++++++++++++++++++--
2 files changed, 100 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/08a44874/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index e204369..10fb07b 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -707,6 +707,16 @@ class TupleCoder(FastCoder):
def __hash__(self):
return hash(self._coders)
+ def to_runner_api_parameter(self, context):
+ if self.is_kv_coder():
+ return urns.KV_CODER, None, self.coders()
+ else:
+ return super(TupleCoder, self).to_runner_api_parameter(context)
+
+ @Coder.register_urn(urns.KV_CODER, None)
+ def from_runner_api_parameter(unused_payload, components, unused_context):
+ return TupleCoder(components)
+
class TupleSequenceCoder(FastCoder):
"""Coder of homogeneous tuple objects."""
http://git-wip-us.apache.org/repos/asf/beam/blob/08a44874/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 7c0c06f..c9b3d9a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -122,7 +122,7 @@ OLDE_SOURCE_SPLITTABLE_DOFN_DATA = pickler.dumps(
class _GroupingBuffer(object):
"""Used to accumulate groupded (shuffled) results."""
def __init__(self, pre_grouped_coder, post_grouped_coder):
- self._key_coder = pre_grouped_coder.value_coder().key_coder()
+ self._key_coder = pre_grouped_coder.key_coder()
self._pre_grouped_coder = pre_grouped_coder
self._post_grouped_coder = post_grouped_coder
self._table = collections.defaultdict(list)
@@ -249,13 +249,80 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
# Now define the "optimization" phases.
+ safe_coders = {}
+
def expand_gbk(stages):
"""Transforms each GBK into a write followed by a read.
"""
+ good_coder_urns = set(beam.coders.Coder._known_urns.keys()) - set([
+ urns.PICKLED_CODER])
+ coders = pipeline_components.coders
+
+ for coder_id, coder_proto in coders.items():
+ if coder_proto.spec.spec.urn == urns.BYTES_CODER:
+ bytes_coder_id = coder_id
+ break
+ else:
+ bytes_coder_id = unique_name(coders, 'bytes_coder')
+ pipeline_components.coders[bytes_coder_id].CopyFrom(
+ beam.coders.BytesCoder().to_runner_api(None))
+
+ coder_substitutions = {}
+
+ def wrap_unknown_coders(coder_id, with_bytes):
+ if (coder_id, with_bytes) not in coder_substitutions:
+ wrapped_coder_id = None
+ coder_proto = coders[coder_id]
+ if coder_proto.spec.spec.urn == urns.LENGTH_PREFIX_CODER:
+ coder_substitutions[coder_id, with_bytes] = (
+ bytes_coder_id if with_bytes else coder_id)
+ elif coder_proto.spec.spec.urn in good_coder_urns:
+ wrapped_components = [wrap_unknown_coders(c, with_bytes)
+ for c in coder_proto.component_coder_ids]
+ if wrapped_components == list(coder_proto.component_coder_ids):
+ # Use as is.
+ coder_substitutions[coder_id, with_bytes] = coder_id
+ else:
+ wrapped_coder_id = unique_name(
+ coders,
+ coder_id + ("_bytes" if with_bytes else "_len_prefix"))
+ coders[wrapped_coder_id].CopyFrom(coder_proto)
+ coders[wrapped_coder_id].component_coder_ids[:] = [
+ wrap_unknown_coders(c, with_bytes)
+ for c in coder_proto.component_coder_ids]
+ coder_substitutions[coder_id, with_bytes] = wrapped_coder_id
+ else:
+ # Not a known coder.
+ if with_bytes:
+ coder_substitutions[coder_id, with_bytes] = bytes_coder_id
+ else:
+ wrapped_coder_id = unique_name(coders, coder_id + "_len_prefix")
+ len_prefix_coder_proto = beam_runner_api_pb2.Coder(
+ spec=beam_runner_api_pb2.SdkFunctionSpec(
+ spec=beam_runner_api_pb2.FunctionSpec(
+ urn=urns.LENGTH_PREFIX_CODER)),
+ component_coder_ids=[coder_id])
+ coders[wrapped_coder_id].CopyFrom(len_prefix_coder_proto)
+ coder_substitutions[coder_id, with_bytes] = wrapped_coder_id
+ # This operation is idempotent.
+ if wrapped_coder_id:
+ coder_substitutions[wrapped_coder_id, with_bytes] = wrapped_coder_id
+ return coder_substitutions[coder_id, with_bytes]
+
+ def fix_pcoll_coder(pcoll):
+ new_coder_id = wrap_unknown_coders(pcoll.coder_id, False)
+ safe_coders[new_coder_id] = wrap_unknown_coders(pcoll.coder_id, True)
+ pcoll.coder_id = new_coder_id
+
for stage in stages:
assert len(stage.transforms) == 1
transform = stage.transforms[0]
if transform.spec.urn == urns.GROUP_BY_KEY_ONLY_TRANSFORM:
+ for pcoll_id in transform.inputs.values():
+ fix_pcoll_coder(pipeline_components.pcollections[pcoll_id])
+ for pcoll_id in transform.outputs.values():
+ fix_pcoll_coder(pipeline_components.pcollections[pcoll_id])
+
# This is used later to correlate the read and write.
param = str("group:%s" % stage.name)
gbk_write = Stage(
@@ -547,9 +614,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
logging.debug('Stages: %s', [str(s) for s in stages])
# Return the (possibly mutated) context and ordered set of stages.
- return pipeline_components, stages
+ return pipeline_components, stages, safe_coders
- def run_stages(self, pipeline_components, stages, direct=True):
+ def run_stages(self, pipeline_components, stages, safe_coders, direct=True):
if direct:
controller = FnApiRunner.DirectController()
@@ -559,13 +626,15 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
try:
pcoll_buffers = collections.defaultdict(list)
for stage in stages:
- self.run_stage(controller, pipeline_components, stage, pcoll_buffers)
+ self.run_stage(
+ controller, pipeline_components, stage, pcoll_buffers, safe_coders)
finally:
controller.close()
return maptask_executor_runner.WorkerRunnerResult(PipelineState.DONE)
- def run_stage(self, controller, pipeline_components, stage, pcoll_buffers):
+ def run_stage(
+ self, controller, pipeline_components, stage, pcoll_buffers, safe_coders):
coders = pipeline_context.PipelineContext(pipeline_components).coders
data_operation_spec = controller.data_operation_spec()
@@ -666,10 +735,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
original_gbk_transform]
input_pcoll = only_element(transform_proto.inputs.values())
output_pcoll = only_element(transform_proto.outputs.values())
- pre_gbk_coder = coders[
- pipeline_components.pcollections[input_pcoll].coder_id]
- post_gbk_coder = coders[
- pipeline_components.pcollections[output_pcoll].coder_id]
+ pre_gbk_coder = coders[safe_coders[
+ pipeline_components.pcollections[input_pcoll].coder_id]]
+ post_gbk_coder = coders[safe_coders[
+ pipeline_components.pcollections[output_pcoll].coder_id]]
pcoll_buffers[pcoll_id] = _GroupingBuffer(
pre_gbk_coder, post_gbk_coder)
pcoll_buffers[pcoll_id].append(output.data)
@@ -1000,3 +1069,15 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
def only_element(iterable):
element, = iterable
return element
+
+
+def unique_name(existing, prefix):
+ if prefix in existing:
+ counter = 0
+ while True:
+ counter += 1
+ prefix_counter = prefix + "_%s" % counter
+ if prefix_counter not in existing:
+ return prefix_counter
+ else:
+ return prefix