You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/03/30 15:22:49 UTC
[1/2] beam git commit: Translate pipeline graph to and from Runner
API protos.
Repository: beam
Updated Branches:
refs/heads/master 22d368b40 -> ffd87553f
Translate pipeline graph to and from Runner API protos.
There are some caveates:
* Specific known transforms, with their payloads, are not yet translated.
* Side inputs are not yet supported.
All pipelines without side inputs are passed through this translation by default
before execution.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5bfc21b8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5bfc21b8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5bfc21b8
Branch: refs/heads/master
Commit: 5bfc21b8fcc1eb5b9cae9b02808f15c06a76ca56
Parents: 22d368b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Mar 20 16:08:37 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 30 08:22:15 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/snippets/snippets.py | 3 +-
.../examples/snippets/snippets_test.py | 3 +
sdks/python/apache_beam/pipeline.py | 107 +++++++++++++++++--
sdks/python/apache_beam/pipeline_test.py | 16 +++
sdks/python/apache_beam/pvalue.py | 37 ++++++-
.../runners/dataflow/dataflow_runner.py | 6 +-
.../runners/direct/transform_evaluator.py | 23 ++--
sdks/python/apache_beam/runners/runner.py | 2 +-
sdks/python/apache_beam/utils/urns.py | 2 +
9 files changed, 165 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 5cb5ee5..85ab864 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -683,7 +683,8 @@ def model_custom_source(count):
lines, beam.equal_to(
['line ' + str(number) for number in range(0, count)]))
- p.run().wait_until_finish()
+ # Don't test runner api due to pickling errors.
+ p.run(test_runner_api=False).wait_until_finish()
def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 9635c7a..64f3dfb 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -328,6 +328,9 @@ class TypeHintsTest(unittest.TestCase):
lines = (p | beam.Create(
['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
+ # For pickling
+ global Player # pylint: disable=global-variable-not-assigned
+
# [START type_hints_deterministic_key]
class Player(object):
def __init__(self, team, name):
http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index dc05bd3..be2a79d 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,6 +52,7 @@ import os
import shutil
import tempfile
+from google.protobuf import wrappers_pb2
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.internal import pickler
@@ -59,6 +60,8 @@ from apache_beam.runners import create_runner
from apache_beam.runners import PipelineRunner
from apache_beam.transforms import ptransform
from apache_beam.typehints import TypeCheckError
+from apache_beam.utils import proto_utils
+from apache_beam.utils import urns
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import StandardOptions
@@ -151,8 +154,14 @@ class Pipeline(object):
"""Returns the root transform of the transform stack."""
return self.transforms_stack[0]
- def run(self):
+ def run(self, test_runner_api=True):
"""Runs the pipeline. Returns whatever our runner returns after running."""
+
+ # When possible, invoke a round trip through the runner API.
+ if test_runner_api and self._verify_runner_api_compatible():
+ return Pipeline.from_runner_api(
+ self.to_runner_api(), self.runner, self.options).run(False)
+
if self.options.view_as(SetupOptions).save_main_session:
# If this option is chosen, verify we can pickle the main session early.
tmpdir = tempfile.mkdtemp()
@@ -299,6 +308,42 @@ class Pipeline(object):
self.transforms_stack.pop()
return pvalueish_result
+ def _verify_runner_api_compatible(self):
+ class Visitor(PipelineVisitor): # pylint: disable=used-before-assignment
+ ok = True # Really a nonlocal.
+
+ def visit_transform(self, transform_node):
+ if transform_node.side_inputs:
+ Visitor.ok = False
+ self.visit(Visitor())
+ return Visitor.ok
+
+ def to_runner_api(self):
+ from apache_beam.runners import pipeline_context
+ from apache_beam.runners.api import beam_runner_api_pb2
+ context = pipeline_context.PipelineContext()
+ # Mutates context; placing inline would force dependence on
+ # argument evaluation order.
+ root_transform_id = context.transforms.get_id(self._root_transform())
+ proto = beam_runner_api_pb2.Pipeline(
+ root_transform_id=root_transform_id,
+ components=context.to_runner_api())
+ return proto
+
+ @staticmethod
+ def from_runner_api(proto, runner, options):
+ p = Pipeline(runner=runner, options=options)
+ from apache_beam.runners import pipeline_context
+ context = pipeline_context.PipelineContext(proto.components)
+ p.transforms_stack = [
+ context.transforms.get_by_id(proto.root_transform_id)]
+ # TODO(robertwb): These are only needed to continue construction. Omit?
+ p.applied_labels = set([
+ t.unique_name for t in proto.components.transforms.values()])
+ for id in proto.components.pcollections:
+ context.pcollections.get_by_id(id).pipeline = p
+ return p
+
class PipelineVisitor(object):
"""Visitor pattern class used to traverse a DAG of transforms.
@@ -374,12 +419,16 @@ class AppliedPTransform(object):
real_producer(side_input).refcounts[side_input.tag] += 1
def add_output(self, output, tag=None):
- assert (isinstance(output, pvalue.PValue) or
- isinstance(output, pvalue.DoOutputsTuple))
- if tag is None:
- tag = len(self.outputs)
- assert tag not in self.outputs
- self.outputs[tag] = output
+ if isinstance(output, pvalue.DoOutputsTuple):
+ self.add_output(output[output._main_tag])
+ elif isinstance(output, pvalue.PValue):
+ # TODO(BEAM-1833): Require tags when calling this method.
+ if tag is None and None in self.outputs:
+ tag = len(self.outputs)
+ assert tag not in self.outputs
+ self.outputs[tag] = output
+ else:
+ raise TypeError("Unexpected output type: %s" % output)
def add_part(self, part):
assert isinstance(part, AppliedPTransform)
@@ -441,3 +490,47 @@ class AppliedPTransform(object):
if v not in visited:
visited.add(v)
visitor.visit_value(v, self)
+
+ def named_inputs(self):
+ # TODO(BEAM-1833): Push names up into the sdk construction.
+ return {str(ix): input for ix, input in enumerate(self.inputs)
+ if isinstance(input, pvalue.PCollection)}
+
+ def to_runner_api(self, context):
+ from apache_beam.runners.api import beam_runner_api_pb2
+ return beam_runner_api_pb2.PTransform(
+ unique_name=self.full_label,
+ spec=beam_runner_api_pb2.UrnWithParameter(
+ urn=urns.PICKLED_TRANSFORM,
+ parameter=proto_utils.pack_Any(
+ wrappers_pb2.BytesValue(value=pickler.dumps(self.transform)))),
+ subtransforms=[context.transforms.get_id(part) for part in self.parts],
+ # TODO(BEAM-115): Side inputs.
+ inputs={tag: context.pcollections.get_id(pc)
+ for tag, pc in self.named_inputs().items()},
+ outputs={str(tag): context.pcollections.get_id(out)
+ for tag, out in self.outputs.items()},
+ # TODO(BEAM-115): display_data
+ display_data=None)
+
+ @staticmethod
+ def from_runner_api(proto, context):
+ result = AppliedPTransform(
+ parent=None,
+ transform=pickler.loads(
+ proto_utils.unpack_Any(proto.spec.parameter,
+ wrappers_pb2.BytesValue).value),
+ full_label=proto.unique_name,
+ inputs=[
+ context.pcollections.get_by_id(id) for id in proto.inputs.values()])
+ result.parts = [
+ context.transforms.get_by_id(id) for id in proto.subtransforms]
+ result.outputs = {
+ None if tag == 'None' else tag: context.pcollections.get_by_id(id)
+ for tag, id in proto.outputs.items()}
+ if not result.parts:
+ for tag, pc in result.outputs.items():
+ if pc not in result.inputs:
+ pc.producer = result
+ pc.tag = tag
+ return result
http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index d80a4e8..d464fdb 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -24,6 +24,7 @@ import unittest
# TODO(BEAM-1555): Test is failing on the service, with FakeSource.
# from nose.plugins.attrib import attr
+import apache_beam as beam
from apache_beam.metrics import Metrics
from apache_beam.pipeline import Pipeline
from apache_beam.pipeline import PipelineOptions
@@ -439,6 +440,21 @@ class PipelineOptionsTest(unittest.TestCase):
if not attr.startswith('_')]))
+class RunnerApiTest(unittest.TestCase):
+
+ def test_simple(self):
+ """Tests serializing, deserializing, and running a simple pipeline.
+
+ More extensive tests are done at pipeline.run for each suitable test.
+ """
+ p = beam.Pipeline()
+ p | beam.Create([None]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned
+ proto = p.to_runner_api()
+
+ p2 = Pipeline.from_runner_api(proto, p.runner, p.options)
+ p2.run()
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index 6dc67b0..4114b3f 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -99,6 +99,13 @@ class PCollection(PValue):
"""Initializes a PCollection. Do not call directly."""
super(PCollection, self).__init__(pipeline, **kwargs)
+ def __eq__(self, other):
+ if isinstance(other, PCollection):
+ return self.tag == other.tag and self.producer == other.producer
+
+ def __hash__(self):
+ return hash((self.tag, self.producer))
+
@property
def windowing(self):
if not hasattr(self, '_windowing'):
@@ -112,6 +119,24 @@ class PCollection(PValue):
# of a closure).
return _InvalidUnpickledPCollection, ()
+ def to_runner_api(self, context):
+ from apache_beam.runners.api import beam_runner_api_pb2
+ from apache_beam.internal import pickler
+ return beam_runner_api_pb2.PCollection(
+ unique_name='%d%s.%s' % (
+ len(self.producer.full_label), self.producer.full_label, self.tag),
+ coder_id=pickler.dumps(self.element_type),
+ is_bounded=beam_runner_api_pb2.BOUNDED,
+ windowing_strategy_id=context.windowing_strategies.get_id(
+ self.windowing))
+
+ @staticmethod
+ def from_runner_api(proto, context):
+ from apache_beam.internal import pickler
+ # Producer and tag will be filled in later, the key point is that the
+ # same object is returned for the same pcollection id.
+ return PCollection(None, element_type=pickler.loads(proto.coder_id))
+
class _InvalidUnpickledPCollection(object):
pass
@@ -183,7 +208,8 @@ class DoOutputsTuple(object):
tag = None
elif self._tags and tag not in self._tags:
raise ValueError(
- 'Tag %s is neither the main tag %s nor any of the side tags %s' % (
+ "Tag '%s' is neither the main tag '%s' "
+ "nor any of the side tags %s" % (
tag, self._main_tag, self._tags))
# Check if we accessed this tag before.
if tag in self._pcolls:
@@ -194,14 +220,15 @@ class DoOutputsTuple(object):
pcoll = PCollection(self._pipeline, tag=tag)
# Transfer the producer from the DoOutputsTuple to the resulting
# PCollection.
- pcoll.producer = self.producer
+ pcoll.producer = self.producer.parts[0]
# Add this as an output to both the inner ParDo and the outer _MultiParDo
# PTransforms.
- self.producer.parts[0].add_output(pcoll, tag)
- self.producer.add_output(pcoll, tag)
+ if tag not in self.producer.parts[0].outputs:
+ self.producer.parts[0].add_output(pcoll, tag)
+ self.producer.add_output(pcoll, tag)
else:
# Main output is output of inner ParDo.
- pcoll = self.producer.parts[0].outputs[0]
+ pcoll = self.producer.parts[0].outputs[None]
self._pcolls[tag] = pcoll
return pcoll
http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index bd29d63..a82671c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -219,9 +219,9 @@ class DataflowRunner(PipelineRunner):
def _get_encoded_output_coder(self, transform_node, window_value=True):
"""Returns the cloud encoding of the coder for the output of a transform."""
if (len(transform_node.outputs) == 1
- and transform_node.outputs[0].element_type is not None):
+ and transform_node.outputs[None].element_type is not None):
# TODO(robertwb): Handle type hints for multi-output transforms.
- element_type = transform_node.outputs[0].element_type
+ element_type = transform_node.outputs[None].element_type
else:
# TODO(silviuc): Remove this branch (and assert) when typehints are
# propagated everywhere. Returning an 'Any' as type hint will trigger
@@ -229,7 +229,7 @@ class DataflowRunner(PipelineRunner):
element_type = typehints.Any
if window_value:
window_coder = (
- transform_node.outputs[0].windowing.windowfn.get_window_coder())
+ transform_node.outputs[None].windowing.windowfn.get_window_coder())
else:
window_coder = None
return self._get_typehint_based_encoding(
http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 6ae5697..f9a0692 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -323,23 +323,12 @@ class _ParDoEvaluator(_TransformEvaluator):
transform = self._applied_ptransform.transform
self._tagged_receivers = _TaggedReceivers(self._evaluation_context)
- if isinstance(self._applied_ptransform.parent.transform, core._MultiParDo): # pylint: disable=protected-access
- do_outputs_tuple = self._applied_ptransform.parent.outputs[0]
- assert isinstance(do_outputs_tuple, pvalue.DoOutputsTuple)
- main_output_pcollection = do_outputs_tuple[do_outputs_tuple._main_tag] # pylint: disable=protected-access
-
- for side_output_tag in transform.side_output_tags:
- output_pcollection = do_outputs_tuple[side_output_tag]
- self._tagged_receivers[side_output_tag] = (
- self._evaluation_context.create_bundle(output_pcollection))
- self._tagged_receivers[side_output_tag].tag = side_output_tag
- else:
- assert len(self._outputs) == 1
- main_output_pcollection = list(self._outputs)[0]
-
- self._tagged_receivers[None] = self._evaluation_context.create_bundle(
- main_output_pcollection)
- self._tagged_receivers[None].tag = None # main_tag is None.
+ for side_output_tag in self._applied_ptransform.outputs:
+ output_pcollection = pvalue.PCollection(None, tag=side_output_tag)
+ output_pcollection.producer = self._applied_ptransform
+ self._tagged_receivers[side_output_tag] = (
+ self._evaluation_context.create_bundle(output_pcollection))
+ self._tagged_receivers[side_output_tag].tag = side_output_tag
self._counter_factory = counters.CounterFactory()
http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index bae21c1..b203c8b 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -222,7 +222,7 @@ class PValueCache(object):
return len(self._cache)
def to_cache_key(self, transform, tag):
- return str((id(transform), tag))
+ return transform.full_label, tag
def _ensure_pvalue_has_real_producer(self, pvalue):
"""Ensure the passed-in PValue has the real_producer attribute.
http://git-wip-us.apache.org/repos/asf/beam/blob/5bfc21b8/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 936e2cb..88fca09 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -22,3 +22,5 @@ SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
PICKLED_CODER = "beam:coder:pickled_python:v0.1"
+
+PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
[2/2] beam git commit: Closes #2296
Posted by ro...@apache.org.
Closes #2296
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ffd87553
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ffd87553
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ffd87553
Branch: refs/heads/master
Commit: ffd87553f3c3c5fc96f1b964d5557cfcd00492f8
Parents: 22d368b 5bfc21b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Mar 30 08:22:16 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 30 08:22:16 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/snippets/snippets.py | 3 +-
.../examples/snippets/snippets_test.py | 3 +
sdks/python/apache_beam/pipeline.py | 107 +++++++++++++++++--
sdks/python/apache_beam/pipeline_test.py | 16 +++
sdks/python/apache_beam/pvalue.py | 37 ++++++-
.../runners/dataflow/dataflow_runner.py | 6 +-
.../runners/direct/transform_evaluator.py | 23 ++--
sdks/python/apache_beam/runners/runner.py | 2 +-
sdks/python/apache_beam/utils/urns.py | 2 +
9 files changed, 165 insertions(+), 34 deletions(-)
----------------------------------------------------------------------