You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/04/27 01:14:33 UTC
[1/5] beam git commit: Translate flatten to Runner API.
Repository: beam
Updated Branches:
refs/heads/master 26c61f414 -> 412b610ed
Translate flatten to Runner API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d096b19a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d096b19a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d096b19a
Branch: refs/heads/master
Commit: d096b19aaf2fa273dd1a13ac7ff96d2e0be030bd
Parents: 97c9b17
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 18 15:51:50 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Apr 26 18:14:12 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/core.py | 12 ++++++++++++
sdks/python/apache_beam/utils/urns.py | 1 +
2 files changed, 13 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d096b19a/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 4709056..14cc620 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -48,6 +48,7 @@ from apache_beam.typehints import TypeCheckError
from apache_beam.typehints import Union
from apache_beam.typehints import WithTypeHints
from apache_beam.typehints.trivial_inference import element_type
+from apache_beam.utils import urns
from apache_beam.utils.pipeline_options import TypeOptions
@@ -1340,6 +1341,17 @@ class Flatten(PTransform):
return Windowing(GlobalWindows())
return super(Flatten, self).get_windowing(inputs)
+ def to_runner_api_parameter(self, context):
+ return urns.FLATTEN_TRANSFORM, None
+
+ @staticmethod
+ def from_runner_api_parameter(unused_parameter, unused_context):
+ return Flatten()
+
+
+PTransform.register_urn(
+ urns.FLATTEN_TRANSFORM, None, Flatten.from_runner_api_parameter)
+
class Create(PTransform):
"""A transform that creates a PCollection from an iterable."""
http://git-wip-us.apache.org/repos/asf/beam/blob/d096b19a/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 88fca09..d10dd26 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -24,3 +24,4 @@ SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
PICKLED_CODER = "beam:coder:pickled_python:v0.1"
PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
+FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"
[4/5] beam git commit: Factor out URN registration logic.
Posted by ro...@apache.org.
Factor out URN registration logic.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87a475e4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87a475e4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87a475e4
Branch: refs/heads/master
Commit: 87a475e40a346d104e5b30e9e2e3f60b9e56916b
Parents: a95fc19
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Apr 19 11:56:55 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Apr 26 18:14:13 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/window.py | 90 +++++-----------------
sdks/python/apache_beam/utils/urns.py | 91 +++++++++++++++++++++++
2 files changed, 108 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/87a475e4/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 7e56c23..9c4b109 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -52,10 +52,8 @@ from __future__ import absolute_import
import abc
from google.protobuf import struct_pb2
-from google.protobuf import wrappers_pb2
from apache_beam import coders
-from apache_beam.internal import pickler
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.transforms import timeutil
from apache_beam.transforms.timeutil import Duration
@@ -92,7 +90,7 @@ class OutputTimeFn(object):
raise ValueError('Invalid OutputTimeFn: %s.' % output_time_fn)
-class WindowFn(object):
+class WindowFn(urns.RunnerApiFn):
"""An abstract windowing function defining a basic assign and merge."""
__metaclass__ = abc.ABCMeta
@@ -150,39 +148,7 @@ class WindowFn(object):
# By default, just return the input timestamp.
return input_timestamp
- _known_urns = {}
-
- @classmethod
- def register_urn(cls, urn, parameter_type, constructor):
- cls._known_urns[urn] = parameter_type, constructor
-
- @classmethod
- def from_runner_api(cls, fn_proto, context):
- parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
- return constructor(
- proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
- context)
-
- def to_runner_api(self, context):
- urn, typed_param = self.to_runner_api_parameter(context)
- return beam_runner_api_pb2.SdkFunctionSpec(
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=urn,
- parameter=proto_utils.pack_Any(typed_param)))
-
- @staticmethod
- def from_runner_api_parameter(fn_parameter, unused_context):
- return pickler.loads(fn_parameter.value)
-
- def to_runner_api_parameter(self, context):
- return (urns.PICKLED_WINDOW_FN,
- wrappers_pb2.BytesValue(value=pickler.dumps(self)))
-
-
-WindowFn.register_urn(
- urns.PICKLED_WINDOW_FN,
- wrappers_pb2.BytesValue,
- WindowFn.from_runner_api_parameter)
+ urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_WINDOW_FN)
class BoundedWindow(object):
@@ -315,16 +281,12 @@ class GlobalWindows(NonMergingWindowFn):
def __ne__(self, other):
return not self == other
- @staticmethod
- def from_runner_api_parameter(unused_fn_parameter, unused_context):
- return GlobalWindows()
-
def to_runner_api_parameter(self, context):
return urns.GLOBAL_WINDOWS_FN, None
-
-WindowFn.register_urn(
- urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter)
+ @urns.RunnerApiFn.register_urn(urns.GLOBAL_WINDOWS_FN, None)
+ def from_runner_api_parameter(unused_fn_parameter, unused_context):
+ return GlobalWindows()
class FixedWindows(NonMergingWindowFn):
@@ -362,22 +324,16 @@ class FixedWindows(NonMergingWindowFn):
def __ne__(self, other):
return not self == other
- @staticmethod
- def from_runner_api_parameter(fn_parameter, unused_context):
- return FixedWindows(
- size=Duration(micros=fn_parameter['size']),
- offset=Timestamp(micros=fn_parameter['offset']))
-
def to_runner_api_parameter(self, context):
return (urns.FIXED_WINDOWS_FN,
proto_utils.pack_Struct(size=self.size.micros,
offset=self.offset.micros))
-
-WindowFn.register_urn(
- urns.FIXED_WINDOWS_FN,
- struct_pb2.Struct,
- FixedWindows.from_runner_api_parameter)
+ @urns.RunnerApiFn.register_urn(urns.FIXED_WINDOWS_FN, struct_pb2.Struct)
+ def from_runner_api_parameter(fn_parameter, unused_context):
+ return FixedWindows(
+ size=Duration(micros=fn_parameter['size']),
+ offset=Timestamp(micros=fn_parameter['offset']))
class SlidingWindows(NonMergingWindowFn):
@@ -419,13 +375,6 @@ class SlidingWindows(NonMergingWindowFn):
and self.offset == other.offset
and self.period == other.period)
- @staticmethod
- def from_runner_api_parameter(fn_parameter, unused_context):
- return SlidingWindows(
- size=Duration(micros=fn_parameter['size']),
- offset=Timestamp(micros=fn_parameter['offset']),
- period=Duration(micros=fn_parameter['period']))
-
def to_runner_api_parameter(self, context):
return (urns.SLIDING_WINDOWS_FN,
proto_utils.pack_Struct(
@@ -433,11 +382,12 @@ class SlidingWindows(NonMergingWindowFn):
offset=self.offset.micros,
period=self.period.micros))
-
-WindowFn.register_urn(
- urns.SLIDING_WINDOWS_FN,
- struct_pb2.Struct,
- SlidingWindows.from_runner_api_parameter)
+ @urns.RunnerApiFn.register_urn(urns.SLIDING_WINDOWS_FN, struct_pb2.Struct)
+ def from_runner_api_parameter(fn_parameter, unused_context):
+ return SlidingWindows(
+ size=Duration(micros=fn_parameter['size']),
+ offset=Timestamp(micros=fn_parameter['offset']),
+ period=Duration(micros=fn_parameter['period']))
class Sessions(WindowFn):
@@ -487,16 +437,10 @@ class Sessions(WindowFn):
if type(self) == type(other) == Sessions:
return self.gap_size == other.gap_size
- @staticmethod
+ @urns.RunnerApiFn.register_urn(urns.SESSION_WINDOWS_FN, struct_pb2.Struct)
def from_runner_api_parameter(fn_parameter, unused_context):
return Sessions(gap_size=Duration(micros=fn_parameter['gap_size']))
def to_runner_api_parameter(self, context):
return (urns.SESSION_WINDOWS_FN,
proto_utils.pack_Struct(gap_size=self.gap_size.micros))
-
-
-WindowFn.register_urn(
- urns.SESSION_WINDOWS_FN,
- struct_pb2.Struct,
- Sessions.from_runner_api_parameter)
http://git-wip-us.apache.org/repos/asf/beam/blob/87a475e4/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index a2f3a3e..46bd8f5 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -15,6 +15,15 @@
# limitations under the License.
#
+import abc
+import inspect
+
+from google.protobuf import wrappers_pb2
+
+from apache_beam.internal import pickler
+from apache_beam.utils import proto_utils
+
+
PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1"
GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1"
FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
@@ -26,3 +35,85 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"
WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1"
+
+
+class RunnerApiFn(object):
+ """Abstract base class that provides urn registration utilities.
+
+ A class that inherits from this class will get a registration-based
+ from_runner_api and to_runner_api method that convert to and from
+ beam_runner_api_pb2.SdkFunctionSpec.
+
+ Additionally, register_pickle_urn can be called from the body of a class
+ to register serialization via pickling.
+ """
+
+ __metaclass__ = abc.ABCMeta
+
+ _known_urns = {}
+
+ @abc.abstractmethod
+ def to_runner_api_parameter(self, unused_context):
+ """Returns the urn and payload for this Fn.
+
+ The returned urn(s) should be registered with `register_urn`.
+ """
+ pass
+
+ @classmethod
+ def register_urn(cls, urn, parameter_type, fn=None):
+ """Registeres a urn with a constructor.
+
+ For example, if 'beam:fn:foo' had paramter type FooPayload, one could
+ write `RunnerApiFn.register_urn('bean:fn:foo', FooPayload, foo_from_proto)`
+ where foo_from_proto took as arguments a FooPayload and a PipelineContext.
+ This function can also be used as a decorator rather than passing the
+ callable in as the final parameter.
+
+ A corresponding to_runner_api_parameter method would be expected that
+ returns the tuple ('beam:fn:foo', FooPayload)
+ """
+ def register(fn):
+ cls._known_urns[urn] = parameter_type, fn
+ return staticmethod(fn)
+ if fn:
+ # Used as a statement.
+ register(fn)
+ else:
+ # Used as a decorator.
+ return register
+
+ @classmethod
+ def register_pickle_urn(cls, pickle_urn):
+ """Registers and implements the given urn via pickling.
+ """
+ inspect.currentframe().f_back.f_locals['to_runner_api_parameter'] = (
+ lambda self, context: (
+ pickle_urn, wrappers_pb2.BytesValue(value=pickler.dumps(self))))
+ cls.register_urn(
+ pickle_urn,
+ wrappers_pb2.BytesValue,
+ lambda proto, unused_context: pickler.loads(proto.value))
+
+ def to_runner_api(self, context):
+ """Returns an SdkFunctionSpec encoding this Fn.
+
+ Prefer overriding self.to_runner_api_parameter.
+ """
+ from apache_beam.runners.api import beam_runner_api_pb2
+ urn, typed_param = self.to_runner_api_parameter(context)
+ return beam_runner_api_pb2.SdkFunctionSpec(
+ spec=beam_runner_api_pb2.FunctionSpec(
+ urn=urn,
+ parameter=proto_utils.pack_Any(typed_param)))
+
+ @classmethod
+ def from_runner_api(cls, fn_proto, context):
+ """Converts from an SdkFunctionSpec to a Fn object.
+
+ Prefer registering a urn with its parameter type and constructor.
+ """
+ parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
+ return constructor(
+ proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
+ context)
[5/5] beam git commit: Closes #2595
Posted by ro...@apache.org.
Closes #2595
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/412b610e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/412b610e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/412b610e
Branch: refs/heads/master
Commit: 412b610edbde16e6f3ce1ae7b0178cb17854f954
Parents: 26c61f4 87a475e
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Apr 26 18:14:14 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Apr 26 18:14:14 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 18 ++--
sdks/python/apache_beam/transforms/core.py | 47 ++++++++--
.../python/apache_beam/transforms/ptransform.py | 40 +++++++++
sdks/python/apache_beam/transforms/window.py | 90 ++++---------------
sdks/python/apache_beam/utils/urns.py | 93 ++++++++++++++++++++
5 files changed, 199 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
[2/5] beam git commit: Per-transform runner api dispatch.
Posted by ro...@apache.org.
Per-transform runner api dispatch.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97c9b174
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97c9b174
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97c9b174
Branch: refs/heads/master
Commit: 97c9b174d027e58ac5202cc1eedeaec59b57023a
Parents: 26c61f4
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 18 15:29:04 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Apr 26 18:14:12 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 18 ++++-----
.../python/apache_beam/transforms/ptransform.py | 40 ++++++++++++++++++++
2 files changed, 48 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/97c9b174/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 0f4c8db..100c50a 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,7 +52,6 @@ import os
import shutil
import tempfile
-from google.protobuf import wrappers_pb2
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.internal import pickler
@@ -60,8 +59,6 @@ from apache_beam.runners import create_runner
from apache_beam.runners import PipelineRunner
from apache_beam.transforms import ptransform
from apache_beam.typehints import TypeCheckError
-from apache_beam.utils import proto_utils
-from apache_beam.utils import urns
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import StandardOptions
@@ -514,12 +511,15 @@ class AppliedPTransform(object):
def to_runner_api(self, context):
from apache_beam.runners.api import beam_runner_api_pb2
+
+ def transform_to_runner_api(transform, context):
+ if transform is None:
+ return None
+ else:
+ return transform.to_runner_api(context)
return beam_runner_api_pb2.PTransform(
unique_name=self.full_label,
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=urns.PICKLED_TRANSFORM,
- parameter=proto_utils.pack_Any(
- wrappers_pb2.BytesValue(value=pickler.dumps(self.transform)))),
+ spec=transform_to_runner_api(self.transform, context),
subtransforms=[context.transforms.get_id(part) for part in self.parts],
# TODO(BEAM-115): Side inputs.
inputs={tag: context.pcollections.get_id(pc)
@@ -533,9 +533,7 @@ class AppliedPTransform(object):
def from_runner_api(proto, context):
result = AppliedPTransform(
parent=None,
- transform=pickler.loads(
- proto_utils.unpack_Any(proto.spec.parameter,
- wrappers_pb2.BytesValue).value),
+ transform=ptransform.PTransform.from_runner_api(proto.spec, context),
full_label=proto.unique_name,
inputs=[
context.pcollections.get_by_id(id) for id in proto.inputs.values()])
http://git-wip-us.apache.org/repos/asf/beam/blob/97c9b174/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index e2c4428..706b003 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -42,6 +42,8 @@ import operator
import os
import sys
+from google.protobuf import wrappers_pb2
+
from apache_beam import error
from apache_beam import pvalue
from apache_beam import typehints
@@ -54,6 +56,8 @@ from apache_beam.typehints import TypeCheckError
from apache_beam.typehints import validate_composite_type_param
from apache_beam.typehints import WithTypeHints
from apache_beam.typehints.trivial_inference import instance_to_type
+from apache_beam.utils import proto_utils
+from apache_beam.utils import urns
class _PValueishTransform(object):
@@ -412,6 +416,42 @@ class PTransform(WithTypeHints, HasDisplayData):
yield pvalueish
return pvalueish, tuple(_dict_tuple_leaves(pvalueish))
+ _known_urns = {}
+
+ @classmethod
+ def register_urn(cls, urn, parameter_type, constructor):
+ cls._known_urns[urn] = parameter_type, constructor
+
+ def to_runner_api(self, context):
+ from apache_beam.runners.api import beam_runner_api_pb2
+ urn, typed_param = self.to_runner_api_parameter(context)
+ return beam_runner_api_pb2.FunctionSpec(
+ urn=urn,
+ parameter=proto_utils.pack_Any(typed_param))
+
+ @classmethod
+ def from_runner_api(cls, proto, context):
+ if proto is None or not proto.urn:
+ return None
+ parameter_type, constructor = cls._known_urns[proto.urn]
+ return constructor(
+ proto_utils.unpack_Any(proto.parameter, parameter_type),
+ context)
+
+ def to_runner_api_parameter(self, context):
+ return (urns.PICKLED_TRANSFORM,
+ wrappers_pb2.BytesValue(value=pickler.dumps(self)))
+
+ @staticmethod
+ def from_runner_api_parameter(spec_parameter, unused_context):
+ return pickler.loads(spec_parameter.value)
+
+
+PTransform.register_urn(
+ urns.PICKLED_TRANSFORM,
+ wrappers_pb2.BytesValue,
+ PTransform.from_runner_api_parameter)
+
class ChainedPTransform(PTransform):
[3/5] beam git commit: Translate WindowInto through the Runner API.
Posted by ro...@apache.org.
Translate WindowInto through the Runner API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a95fc199
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a95fc199
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a95fc199
Branch: refs/heads/master
Commit: a95fc199fb7daa7a5e7dd2be7d1eda11748b0e6b
Parents: d096b19
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 18 16:07:32 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Apr 26 18:14:13 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/core.py | 35 ++++++++++++++++++++-----
sdks/python/apache_beam/utils/urns.py | 1 +
2 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a95fc199/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 14cc620..64911d6 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -590,6 +590,9 @@ class ParDo(PTransformWithSideInputs):
def __init__(self, fn, *args, **kwargs):
super(ParDo, self).__init__(fn, *args, **kwargs)
+ # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
+ self.dofn = self.fn
+ self.output_tags = set()
if not isinstance(self.fn, DoFn):
raise TypeError('ParDo must be called with a DoFn instance.')
@@ -615,9 +618,6 @@ class ParDo(PTransformWithSideInputs):
'fn_dd': self.fn}
def expand(self, pcoll):
- self.output_tags = set()
- # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
- self.dofn = self.fn
return pvalue.PCollection(pcoll.pipeline)
def with_outputs(self, *tags, **main_kw):
@@ -1268,7 +1268,7 @@ class WindowInto(ParDo):
new_windows = self.windowing.windowfn.assign(context)
yield WindowedValue(element, context.timestamp, new_windows)
- def __init__(self, windowfn, *args, **kwargs):
+ def __init__(self, windowfn, **kwargs):
"""Initializes a WindowInto transform.
Args:
@@ -1279,8 +1279,7 @@ class WindowInto(ParDo):
output_time_fn = kwargs.pop('output_time_fn', None)
self.windowing = Windowing(windowfn, triggerfn, accumulation_mode,
output_time_fn)
- dofn = self.WindowIntoFn(self.windowing)
- super(WindowInto, self).__init__(dofn)
+ super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing))
def get_windowing(self, unused_inputs):
return self.windowing
@@ -1297,6 +1296,30 @@ class WindowInto(ParDo):
self.with_output_types(output_type)
return super(WindowInto, self).expand(pcoll)
+ def to_runner_api_parameter(self, context):
+ return (
+ urns.WINDOW_INTO_TRANSFORM,
+ self.windowing.to_runner_api(context))
+
+ @staticmethod
+ def from_runner_api_parameter(proto, context):
+ windowing = Windowing.from_runner_api(proto, context)
+ return WindowInto(
+ windowing.windowfn,
+ trigger=windowing.triggerfn,
+ accumulation_mode=windowing.accumulation_mode,
+ output_time_fn=windowing.output_time_fn)
+
+
+PTransform.register_urn(
+ urns.WINDOW_INTO_TRANSFORM,
+ # TODO(robertwb): Update WindowIntoPayload to include the full strategy.
+ # (Right now only WindowFn is used, but we need this to reconstitute the
+ # WindowInto transform, and in the future will need it at runtime to
+ # support meta-data driven triggers.)
+ beam_runner_api_pb2.WindowingStrategy,
+ WindowInto.from_runner_api_parameter)
+
# Python's pickling is broken for nested classes.
WindowIntoFn = WindowInto.WindowIntoFn
http://git-wip-us.apache.org/repos/asf/beam/blob/a95fc199/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index d10dd26..a2f3a3e 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -25,3 +25,4 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"
+WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1"