You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/03/30 15:22:49 UTC

[1/2] beam git commit: Translate pipeline graph to and from Runner API protos.

Repository: beam
Updated Branches:
  refs/heads/master 22d368b40 -> ffd87553f


Translate pipeline graph to and from Runner API protos.

There are some caveates:
  * Specific known transforms, with their payloads, are not yet translated.
  * Side inputs are not yet supported.

All pipelines without side inputs are passed through this translation by default
before execution.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5bfc21b8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5bfc21b8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5bfc21b8

Branch: refs/heads/master
Commit: 5bfc21b8fcc1eb5b9cae9b02808f15c06a76ca56
Parents: 22d368b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Mar 20 16:08:37 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 30 08:22:15 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   |   3 +-
 .../examples/snippets/snippets_test.py          |   3 +
 sdks/python/apache_beam/pipeline.py             | 107 +++++++++++++++++--
 sdks/python/apache_beam/pipeline_test.py        |  16 +++
 sdks/python/apache_beam/pvalue.py               |  37 ++++++-
 .../runners/dataflow/dataflow_runner.py         |   6 +-
 .../runners/direct/transform_evaluator.py       |  23 ++--
 sdks/python/apache_beam/runners/runner.py       |   2 +-
 sdks/python/apache_beam/utils/urns.py           |   2 +
 9 files changed, 165 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 5cb5ee5..85ab864 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -683,7 +683,8 @@ def model_custom_source(count):
       lines, beam.equal_to(
           ['line ' + str(number) for number in range(0, count)]))
 
-  p.run().wait_until_finish()
+  # Don't test runner api due to pickling errors.
+  p.run(test_runner_api=False).wait_until_finish()
 
 
 def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,

http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 9635c7a..64f3dfb 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -328,6 +328,9 @@ class TypeHintsTest(unittest.TestCase):
     lines = (p | beam.Create(
         ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
 
+    # For pickling
+    global Player  # pylint: disable=global-variable-not-assigned
+
     # [START type_hints_deterministic_key]
     class Player(object):
       def __init__(self, team, name):

http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index dc05bd3..be2a79d 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,6 +52,7 @@ import os
 import shutil
 import tempfile
 
+from google.protobuf import wrappers_pb2
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.internal import pickler
@@ -59,6 +60,8 @@ from apache_beam.runners import create_runner
 from apache_beam.runners import PipelineRunner
 from apache_beam.transforms import ptransform
 from apache_beam.typehints import TypeCheckError
+from apache_beam.utils import proto_utils
+from apache_beam.utils import urns
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 from apache_beam.utils.pipeline_options import StandardOptions
@@ -151,8 +154,14 @@ class Pipeline(object):
     """Returns the root transform of the transform stack."""
     return self.transforms_stack[0]
 
-  def run(self):
+  def run(self, test_runner_api=True):
     """Runs the pipeline. Returns whatever our runner returns after running."""
+
+    # When possible, invoke a round trip through the runner API.
+    if test_runner_api and self._verify_runner_api_compatible():
+      return Pipeline.from_runner_api(
+          self.to_runner_api(), self.runner, self.options).run(False)
+
     if self.options.view_as(SetupOptions).save_main_session:
       # If this option is chosen, verify we can pickle the main session early.
       tmpdir = tempfile.mkdtemp()
@@ -299,6 +308,42 @@ class Pipeline(object):
     self.transforms_stack.pop()
     return pvalueish_result
 
+  def _verify_runner_api_compatible(self):
+    class Visitor(PipelineVisitor):  # pylint: disable=used-before-assignment
+      ok = True  # Really a nonlocal.
+
+      def visit_transform(self, transform_node):
+        if transform_node.side_inputs:
+          Visitor.ok = False
+    self.visit(Visitor())
+    return Visitor.ok
+
+  def to_runner_api(self):
+    from apache_beam.runners import pipeline_context
+    from apache_beam.runners.api import beam_runner_api_pb2
+    context = pipeline_context.PipelineContext()
+    # Mutates context; placing inline would force dependence on
+    # argument evaluation order.
+    root_transform_id = context.transforms.get_id(self._root_transform())
+    proto = beam_runner_api_pb2.Pipeline(
+        root_transform_id=root_transform_id,
+        components=context.to_runner_api())
+    return proto
+
+  @staticmethod
+  def from_runner_api(proto, runner, options):
+    p = Pipeline(runner=runner, options=options)
+    from apache_beam.runners import pipeline_context
+    context = pipeline_context.PipelineContext(proto.components)
+    p.transforms_stack = [
+        context.transforms.get_by_id(proto.root_transform_id)]
+    # TODO(robertwb): These are only needed to continue construction. Omit?
+    p.applied_labels = set([
+        t.unique_name for t in proto.components.transforms.values()])
+    for id in proto.components.pcollections:
+      context.pcollections.get_by_id(id).pipeline = p
+    return p
+
 
 class PipelineVisitor(object):
   """Visitor pattern class used to traverse a DAG of transforms.
@@ -374,12 +419,16 @@ class AppliedPTransform(object):
         real_producer(side_input).refcounts[side_input.tag] += 1
 
   def add_output(self, output, tag=None):
-    assert (isinstance(output, pvalue.PValue) or
-            isinstance(output, pvalue.DoOutputsTuple))
-    if tag is None:
-      tag = len(self.outputs)
-    assert tag not in self.outputs
-    self.outputs[tag] = output
+    if isinstance(output, pvalue.DoOutputsTuple):
+      self.add_output(output[output._main_tag])
+    elif isinstance(output, pvalue.PValue):
+      # TODO(BEAM-1833): Require tags when calling this method.
+      if tag is None and None in self.outputs:
+        tag = len(self.outputs)
+      assert tag not in self.outputs
+      self.outputs[tag] = output
+    else:
+      raise TypeError("Unexpected output type: %s" % output)
 
   def add_part(self, part):
     assert isinstance(part, AppliedPTransform)
@@ -441,3 +490,47 @@ class AppliedPTransform(object):
         if v not in visited:
           visited.add(v)
           visitor.visit_value(v, self)
+
+  def named_inputs(self):
+    # TODO(BEAM-1833): Push names up into the sdk construction.
+    return {str(ix): input for ix, input in enumerate(self.inputs)
+            if isinstance(input, pvalue.PCollection)}
+
+  def to_runner_api(self, context):
+    from apache_beam.runners.api import beam_runner_api_pb2
+    return beam_runner_api_pb2.PTransform(
+        unique_name=self.full_label,
+        spec=beam_runner_api_pb2.UrnWithParameter(
+            urn=urns.PICKLED_TRANSFORM,
+            parameter=proto_utils.pack_Any(
+                wrappers_pb2.BytesValue(value=pickler.dumps(self.transform)))),
+        subtransforms=[context.transforms.get_id(part) for part in self.parts],
+        # TODO(BEAM-115): Side inputs.
+        inputs={tag: context.pcollections.get_id(pc)
+                for tag, pc in self.named_inputs().items()},
+        outputs={str(tag): context.pcollections.get_id(out)
+                 for tag, out in self.outputs.items()},
+        # TODO(BEAM-115): display_data
+        display_data=None)
+
+  @staticmethod
+  def from_runner_api(proto, context):
+    result = AppliedPTransform(
+        parent=None,
+        transform=pickler.loads(
+            proto_utils.unpack_Any(proto.spec.parameter,
+                                   wrappers_pb2.BytesValue).value),
+        full_label=proto.unique_name,
+        inputs=[
+            context.pcollections.get_by_id(id) for id in proto.inputs.values()])
+    result.parts = [
+        context.transforms.get_by_id(id) for id in proto.subtransforms]
+    result.outputs = {
+        None if tag == 'None' else tag: context.pcollections.get_by_id(id)
+        for tag, id in proto.outputs.items()}
+    if not result.parts:
+      for tag, pc in result.outputs.items():
+        if pc not in result.inputs:
+          pc.producer = result
+          pc.tag = tag
+    return result

http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index d80a4e8..d464fdb 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -24,6 +24,7 @@ import unittest
 # TODO(BEAM-1555): Test is failing on the service, with FakeSource.
 # from nose.plugins.attrib import attr
 
+import apache_beam as beam
 from apache_beam.metrics import Metrics
 from apache_beam.pipeline import Pipeline
 from apache_beam.pipeline import PipelineOptions
@@ -439,6 +440,21 @@ class PipelineOptionsTest(unittest.TestCase):
              if not attr.startswith('_')]))
 
 
+class RunnerApiTest(unittest.TestCase):
+
+  def test_simple(self):
+    """Tests serializing, deserializing, and running a simple pipeline.
+
+    More extensive tests are done at pipeline.run for each suitable test.
+    """
+    p = beam.Pipeline()
+    p | beam.Create([None]) | beam.Map(lambda x: x)  # pylint: disable=expression-not-assigned
+    proto = p.to_runner_api()
+
+    p2 = Pipeline.from_runner_api(proto, p.runner, p.options)
+    p2.run()
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)
   unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index 6dc67b0..4114b3f 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -99,6 +99,13 @@ class PCollection(PValue):
     """Initializes a PCollection. Do not call directly."""
     super(PCollection, self).__init__(pipeline, **kwargs)
 
+  def __eq__(self, other):
+    if isinstance(other, PCollection):
+      return self.tag == other.tag and self.producer == other.producer
+
+  def __hash__(self):
+    return hash((self.tag, self.producer))
+
   @property
   def windowing(self):
     if not hasattr(self, '_windowing'):
@@ -112,6 +119,24 @@ class PCollection(PValue):
     # of a closure).
     return _InvalidUnpickledPCollection, ()
 
+  def to_runner_api(self, context):
+    from apache_beam.runners.api import beam_runner_api_pb2
+    from apache_beam.internal import pickler
+    return beam_runner_api_pb2.PCollection(
+        unique_name='%d%s.%s' % (
+            len(self.producer.full_label), self.producer.full_label, self.tag),
+        coder_id=pickler.dumps(self.element_type),
+        is_bounded=beam_runner_api_pb2.BOUNDED,
+        windowing_strategy_id=context.windowing_strategies.get_id(
+            self.windowing))
+
+  @staticmethod
+  def from_runner_api(proto, context):
+    from apache_beam.internal import pickler
+    # Producer and tag will be filled in later, the key point is that the
+    # same object is returned for the same pcollection id.
+    return PCollection(None, element_type=pickler.loads(proto.coder_id))
+
 
 class _InvalidUnpickledPCollection(object):
   pass
@@ -183,7 +208,8 @@ class DoOutputsTuple(object):
       tag = None
     elif self._tags and tag not in self._tags:
       raise ValueError(
-          'Tag %s is neither the main tag %s nor any of the side tags %s' % (
+          "Tag '%s' is neither the main tag '%s' "
+          "nor any of the side tags %s" % (
               tag, self._main_tag, self._tags))
     # Check if we accessed this tag before.
     if tag in self._pcolls:
@@ -194,14 +220,15 @@ class DoOutputsTuple(object):
       pcoll = PCollection(self._pipeline, tag=tag)
       # Transfer the producer from the DoOutputsTuple to the resulting
       # PCollection.
-      pcoll.producer = self.producer
+      pcoll.producer = self.producer.parts[0]
       # Add this as an output to both the inner ParDo and the outer _MultiParDo
       # PTransforms.
-      self.producer.parts[0].add_output(pcoll, tag)
-      self.producer.add_output(pcoll, tag)
+      if tag not in self.producer.parts[0].outputs:
+        self.producer.parts[0].add_output(pcoll, tag)
+        self.producer.add_output(pcoll, tag)
     else:
       # Main output is output of inner ParDo.
-      pcoll = self.producer.parts[0].outputs[0]
+      pcoll = self.producer.parts[0].outputs[None]
     self._pcolls[tag] = pcoll
     return pcoll
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index bd29d63..a82671c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -219,9 +219,9 @@ class DataflowRunner(PipelineRunner):
   def _get_encoded_output_coder(self, transform_node, window_value=True):
     """Returns the cloud encoding of the coder for the output of a transform."""
     if (len(transform_node.outputs) == 1
-        and transform_node.outputs[0].element_type is not None):
+        and transform_node.outputs[None].element_type is not None):
       # TODO(robertwb): Handle type hints for multi-output transforms.
-      element_type = transform_node.outputs[0].element_type
+      element_type = transform_node.outputs[None].element_type
     else:
       # TODO(silviuc): Remove this branch (and assert) when typehints are
       # propagated everywhere. Returning an 'Any' as type hint will trigger
@@ -229,7 +229,7 @@ class DataflowRunner(PipelineRunner):
       element_type = typehints.Any
     if window_value:
       window_coder = (
-          transform_node.outputs[0].windowing.windowfn.get_window_coder())
+          transform_node.outputs[None].windowing.windowfn.get_window_coder())
     else:
       window_coder = None
     return self._get_typehint_based_encoding(

http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 6ae5697..f9a0692 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -323,23 +323,12 @@ class _ParDoEvaluator(_TransformEvaluator):
     transform = self._applied_ptransform.transform
 
     self._tagged_receivers = _TaggedReceivers(self._evaluation_context)
-    if isinstance(self._applied_ptransform.parent.transform, core._MultiParDo):  # pylint: disable=protected-access
-      do_outputs_tuple = self._applied_ptransform.parent.outputs[0]
-      assert isinstance(do_outputs_tuple, pvalue.DoOutputsTuple)
-      main_output_pcollection = do_outputs_tuple[do_outputs_tuple._main_tag]  # pylint: disable=protected-access
-
-      for side_output_tag in transform.side_output_tags:
-        output_pcollection = do_outputs_tuple[side_output_tag]
-        self._tagged_receivers[side_output_tag] = (
-            self._evaluation_context.create_bundle(output_pcollection))
-        self._tagged_receivers[side_output_tag].tag = side_output_tag
-    else:
-      assert len(self._outputs) == 1
-      main_output_pcollection = list(self._outputs)[0]
-
-    self._tagged_receivers[None] = self._evaluation_context.create_bundle(
-        main_output_pcollection)
-    self._tagged_receivers[None].tag = None  # main_tag is None.
+    for side_output_tag in self._applied_ptransform.outputs:
+      output_pcollection = pvalue.PCollection(None, tag=side_output_tag)
+      output_pcollection.producer = self._applied_ptransform
+      self._tagged_receivers[side_output_tag] = (
+          self._evaluation_context.create_bundle(output_pcollection))
+      self._tagged_receivers[side_output_tag].tag = side_output_tag
 
     self._counter_factory = counters.CounterFactory()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index bae21c1..b203c8b 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -222,7 +222,7 @@ class PValueCache(object):
     return len(self._cache)
 
   def to_cache_key(self, transform, tag):
-    return str((id(transform), tag))
+    return transform.full_label, tag
 
   def _ensure_pvalue_has_real_producer(self, pvalue):
     """Ensure the passed-in PValue has the real_producer attribute.

http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 936e2cb..88fca09 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -22,3 +22,5 @@ SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
 SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
 
 PICKLED_CODER = "beam:coder:pickled_python:v0.1"
+
+PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"


[2/2] beam git commit: Closes #2296

Posted by ro...@apache.org.
Closes #2296


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ffd87553
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ffd87553
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ffd87553

Branch: refs/heads/master
Commit: ffd87553f3c3c5fc96f1b964d5557cfcd00492f8
Parents: 22d368b 5bfc21b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Mar 30 08:22:16 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 30 08:22:16 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   |   3 +-
 .../examples/snippets/snippets_test.py          |   3 +
 sdks/python/apache_beam/pipeline.py             | 107 +++++++++++++++++--
 sdks/python/apache_beam/pipeline_test.py        |  16 +++
 sdks/python/apache_beam/pvalue.py               |  37 ++++++-
 .../runners/dataflow/dataflow_runner.py         |   6 +-
 .../runners/direct/transform_evaluator.py       |  23 ++--
 sdks/python/apache_beam/runners/runner.py       |   2 +-
 sdks/python/apache_beam/utils/urns.py           |   2 +
 9 files changed, 165 insertions(+), 34 deletions(-)
----------------------------------------------------------------------