You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/28 17:04:21 UTC

[2/2] beam git commit: Wrap unknown coders in LengthPrefixCoder.

Wrap unknown coders in LengthPrefixCoder.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/08a44874
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/08a44874
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/08a44874

Branch: refs/heads/master
Commit: 08a448743e3b53e055d0ccf1983b5d128c5c0692
Parents: e6d5e08
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Aug 24 11:01:20 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Aug 28 10:03:52 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coders.py        | 10 ++
 .../runners/portability/fn_api_runner.py        | 99 ++++++++++++++++++--
 2 files changed, 100 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/08a44874/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index e204369..10fb07b 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -707,6 +707,16 @@ class TupleCoder(FastCoder):
   def __hash__(self):
     return hash(self._coders)
 
+  def to_runner_api_parameter(self, context):
+    if self.is_kv_coder():
+      return urns.KV_CODER, None, self.coders()
+    else:
+      return super(TupleCoder, self).to_runner_api_parameter(context)
+
+  @Coder.register_urn(urns.KV_CODER, None)
+  def from_runner_api_parameter(unused_payload, components, unused_context):
+    return TupleCoder(components)
+
 
 class TupleSequenceCoder(FastCoder):
   """Coder of homogeneous tuple objects."""

http://git-wip-us.apache.org/repos/asf/beam/blob/08a44874/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 7c0c06f..c9b3d9a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -122,7 +122,7 @@ OLDE_SOURCE_SPLITTABLE_DOFN_DATA = pickler.dumps(
 class _GroupingBuffer(object):
   """Used to accumulate groupded (shuffled) results."""
   def __init__(self, pre_grouped_coder, post_grouped_coder):
-    self._key_coder = pre_grouped_coder.value_coder().key_coder()
+    self._key_coder = pre_grouped_coder.key_coder()
     self._pre_grouped_coder = pre_grouped_coder
     self._post_grouped_coder = post_grouped_coder
     self._table = collections.defaultdict(list)
@@ -249,13 +249,80 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 
     # Now define the "optimization" phases.
 
+    safe_coders = {}
+
     def expand_gbk(stages):
       """Transforms each GBK into a write followed by a read.
       """
+      good_coder_urns = set(beam.coders.Coder._known_urns.keys()) - set([
+          urns.PICKLED_CODER])
+      coders = pipeline_components.coders
+
+      for coder_id, coder_proto in coders.items():
+        if coder_proto.spec.spec.urn == urns.BYTES_CODER:
+          bytes_coder_id = coder_id
+          break
+      else:
+        bytes_coder_id = unique_name(coders, 'bytes_coder')
+        pipeline_components.coders[bytes_coder_id].CopyFrom(
+            beam.coders.BytesCoder().to_runner_api(None))
+
+      coder_substitutions = {}
+
+      def wrap_unknown_coders(coder_id, with_bytes):
+        if (coder_id, with_bytes) not in coder_substitutions:
+          wrapped_coder_id = None
+          coder_proto = coders[coder_id]
+          if coder_proto.spec.spec.urn == urns.LENGTH_PREFIX_CODER:
+            coder_substitutions[coder_id, with_bytes] = (
+                bytes_coder_id if with_bytes else coder_id)
+          elif coder_proto.spec.spec.urn in good_coder_urns:
+            wrapped_components = [wrap_unknown_coders(c, with_bytes)
+                                  for c in coder_proto.component_coder_ids]
+            if wrapped_components == list(coder_proto.component_coder_ids):
+              # Use as is.
+              coder_substitutions[coder_id, with_bytes] = coder_id
+            else:
+              wrapped_coder_id = unique_name(
+                  coders,
+                  coder_id + ("_bytes" if with_bytes else "_len_prefix"))
+              coders[wrapped_coder_id].CopyFrom(coder_proto)
+              coders[wrapped_coder_id].component_coder_ids[:] = [
+                  wrap_unknown_coders(c, with_bytes)
+                  for c in coder_proto.component_coder_ids]
+              coder_substitutions[coder_id, with_bytes] = wrapped_coder_id
+          else:
+            # Not a known coder.
+            if with_bytes:
+              coder_substitutions[coder_id, with_bytes] = bytes_coder_id
+            else:
+              wrapped_coder_id = unique_name(coders, coder_id +  "_len_prefix")
+              len_prefix_coder_proto = beam_runner_api_pb2.Coder(
+                  spec=beam_runner_api_pb2.SdkFunctionSpec(
+                      spec=beam_runner_api_pb2.FunctionSpec(
+                          urn=urns.LENGTH_PREFIX_CODER)),
+                  component_coder_ids=[coder_id])
+              coders[wrapped_coder_id].CopyFrom(len_prefix_coder_proto)
+              coder_substitutions[coder_id, with_bytes] = wrapped_coder_id
+          # This operation is idempotent.
+          if wrapped_coder_id:
+            coder_substitutions[wrapped_coder_id, with_bytes] = wrapped_coder_id
+        return coder_substitutions[coder_id, with_bytes]
+
+      def fix_pcoll_coder(pcoll):
+        new_coder_id = wrap_unknown_coders(pcoll.coder_id, False)
+        safe_coders[new_coder_id] = wrap_unknown_coders(pcoll.coder_id, True)
+        pcoll.coder_id = new_coder_id
+
       for stage in stages:
         assert len(stage.transforms) == 1
         transform = stage.transforms[0]
         if transform.spec.urn == urns.GROUP_BY_KEY_ONLY_TRANSFORM:
+          for pcoll_id in transform.inputs.values():
+            fix_pcoll_coder(pipeline_components.pcollections[pcoll_id])
+          for pcoll_id in transform.outputs.values():
+            fix_pcoll_coder(pipeline_components.pcollections[pcoll_id])
+
           # This is used later to correlate the read and write.
           param = str("group:%s" % stage.name)
           gbk_write = Stage(
@@ -547,9 +614,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       logging.debug('Stages: %s', [str(s) for s in stages])
 
     # Return the (possibly mutated) context and ordered set of stages.
-    return pipeline_components, stages
+    return pipeline_components, stages, safe_coders
 
-  def run_stages(self, pipeline_components, stages, direct=True):
+  def run_stages(self, pipeline_components, stages, safe_coders, direct=True):
 
     if direct:
       controller = FnApiRunner.DirectController()
@@ -559,13 +626,15 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
     try:
       pcoll_buffers = collections.defaultdict(list)
       for stage in stages:
-        self.run_stage(controller, pipeline_components, stage, pcoll_buffers)
+        self.run_stage(
+            controller, pipeline_components, stage, pcoll_buffers, safe_coders)
     finally:
       controller.close()
 
     return maptask_executor_runner.WorkerRunnerResult(PipelineState.DONE)
 
-  def run_stage(self, controller, pipeline_components, stage, pcoll_buffers):
+  def run_stage(
+      self, controller, pipeline_components, stage, pcoll_buffers, safe_coders):
 
     coders = pipeline_context.PipelineContext(pipeline_components).coders
     data_operation_spec = controller.data_operation_spec()
@@ -666,10 +735,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                 original_gbk_transform]
             input_pcoll = only_element(transform_proto.inputs.values())
             output_pcoll = only_element(transform_proto.outputs.values())
-            pre_gbk_coder = coders[
-                pipeline_components.pcollections[input_pcoll].coder_id]
-            post_gbk_coder = coders[
-                pipeline_components.pcollections[output_pcoll].coder_id]
+            pre_gbk_coder = coders[safe_coders[
+                pipeline_components.pcollections[input_pcoll].coder_id]]
+            post_gbk_coder = coders[safe_coders[
+                pipeline_components.pcollections[output_pcoll].coder_id]]
             pcoll_buffers[pcoll_id] = _GroupingBuffer(
                 pre_gbk_coder, post_gbk_coder)
           pcoll_buffers[pcoll_id].append(output.data)
@@ -1000,3 +1069,15 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 def only_element(iterable):
   element, = iterable
   return element
+
+
+def unique_name(existing, prefix):
+  if prefix in existing:
+    counter = 0
+    while True:
+      counter += 1
+      prefix_counter = prefix + "_%s" % counter
+      if prefix_counter not in existing:
+        return prefix_counter
+  else:
+    return prefix