You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/04/27 01:14:33 UTC

[1/5] beam git commit: Translate flatten to Runner API.

Repository: beam
Updated Branches:
  refs/heads/master 26c61f414 -> 412b610ed


Translate flatten to Runner API.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d096b19a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d096b19a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d096b19a

Branch: refs/heads/master
Commit: d096b19aaf2fa273dd1a13ac7ff96d2e0be030bd
Parents: 97c9b17
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 18 15:51:50 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Apr 26 18:14:12 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/core.py | 12 ++++++++++++
 sdks/python/apache_beam/utils/urns.py      |  1 +
 2 files changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d096b19a/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 4709056..14cc620 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -48,6 +48,7 @@ from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints import Union
 from apache_beam.typehints import WithTypeHints
 from apache_beam.typehints.trivial_inference import element_type
+from apache_beam.utils import urns
 from apache_beam.utils.pipeline_options import TypeOptions
 
 
@@ -1340,6 +1341,17 @@ class Flatten(PTransform):
       return Windowing(GlobalWindows())
     return super(Flatten, self).get_windowing(inputs)
 
+  def to_runner_api_parameter(self, context):
+    return urns.FLATTEN_TRANSFORM, None
+
+  @staticmethod
+  def from_runner_api_parameter(unused_parameter, unused_context):
+    return Flatten()
+
+
+PTransform.register_urn(
+    urns.FLATTEN_TRANSFORM, None, Flatten.from_runner_api_parameter)
+
 
 class Create(PTransform):
   """A transform that creates a PCollection from an iterable."""

http://git-wip-us.apache.org/repos/asf/beam/blob/d096b19a/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 88fca09..d10dd26 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -24,3 +24,4 @@ SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
 PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 
 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
+FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"


[4/5] beam git commit: Factor out URN registration logic.

Posted by ro...@apache.org.
Factor out URN registration logic.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87a475e4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87a475e4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87a475e4

Branch: refs/heads/master
Commit: 87a475e40a346d104e5b30e9e2e3f60b9e56916b
Parents: a95fc19
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Apr 19 11:56:55 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Apr 26 18:14:13 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/window.py | 90 +++++-----------------
 sdks/python/apache_beam/utils/urns.py        | 91 +++++++++++++++++++++++
 2 files changed, 108 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/87a475e4/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 7e56c23..9c4b109 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -52,10 +52,8 @@ from __future__ import absolute_import
 import abc
 
 from google.protobuf import struct_pb2
-from google.protobuf import wrappers_pb2
 
 from apache_beam import coders
-from apache_beam.internal import pickler
 from apache_beam.runners.api import beam_runner_api_pb2
 from apache_beam.transforms import timeutil
 from apache_beam.transforms.timeutil import Duration
@@ -92,7 +90,7 @@ class OutputTimeFn(object):
       raise ValueError('Invalid OutputTimeFn: %s.' % output_time_fn)
 
 
-class WindowFn(object):
+class WindowFn(urns.RunnerApiFn):
   """An abstract windowing function defining a basic assign and merge."""
 
   __metaclass__ = abc.ABCMeta
@@ -150,39 +148,7 @@ class WindowFn(object):
     # By default, just return the input timestamp.
     return input_timestamp
 
-  _known_urns = {}
-
-  @classmethod
-  def register_urn(cls, urn, parameter_type, constructor):
-    cls._known_urns[urn] = parameter_type, constructor
-
-  @classmethod
-  def from_runner_api(cls, fn_proto, context):
-    parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
-    return constructor(
-        proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
-        context)
-
-  def to_runner_api(self, context):
-    urn, typed_param = self.to_runner_api_parameter(context)
-    return beam_runner_api_pb2.SdkFunctionSpec(
-        spec=beam_runner_api_pb2.FunctionSpec(
-            urn=urn,
-            parameter=proto_utils.pack_Any(typed_param)))
-
-  @staticmethod
-  def from_runner_api_parameter(fn_parameter, unused_context):
-    return pickler.loads(fn_parameter.value)
-
-  def to_runner_api_parameter(self, context):
-    return (urns.PICKLED_WINDOW_FN,
-            wrappers_pb2.BytesValue(value=pickler.dumps(self)))
-
-
-WindowFn.register_urn(
-    urns.PICKLED_WINDOW_FN,
-    wrappers_pb2.BytesValue,
-    WindowFn.from_runner_api_parameter)
+  urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_WINDOW_FN)
 
 
 class BoundedWindow(object):
@@ -315,16 +281,12 @@ class GlobalWindows(NonMergingWindowFn):
   def __ne__(self, other):
     return not self == other
 
-  @staticmethod
-  def from_runner_api_parameter(unused_fn_parameter, unused_context):
-    return GlobalWindows()
-
   def to_runner_api_parameter(self, context):
     return urns.GLOBAL_WINDOWS_FN, None
 
-
-WindowFn.register_urn(
-    urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter)
+  @urns.RunnerApiFn.register_urn(urns.GLOBAL_WINDOWS_FN, None)
+  def from_runner_api_parameter(unused_fn_parameter, unused_context):
+    return GlobalWindows()
 
 
 class FixedWindows(NonMergingWindowFn):
@@ -362,22 +324,16 @@ class FixedWindows(NonMergingWindowFn):
   def __ne__(self, other):
     return not self == other
 
-  @staticmethod
-  def from_runner_api_parameter(fn_parameter, unused_context):
-    return FixedWindows(
-        size=Duration(micros=fn_parameter['size']),
-        offset=Timestamp(micros=fn_parameter['offset']))
-
   def to_runner_api_parameter(self, context):
     return (urns.FIXED_WINDOWS_FN,
             proto_utils.pack_Struct(size=self.size.micros,
                                     offset=self.offset.micros))
 
-
-WindowFn.register_urn(
-    urns.FIXED_WINDOWS_FN,
-    struct_pb2.Struct,
-    FixedWindows.from_runner_api_parameter)
+  @urns.RunnerApiFn.register_urn(urns.FIXED_WINDOWS_FN, struct_pb2.Struct)
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return FixedWindows(
+        size=Duration(micros=fn_parameter['size']),
+        offset=Timestamp(micros=fn_parameter['offset']))
 
 
 class SlidingWindows(NonMergingWindowFn):
@@ -419,13 +375,6 @@ class SlidingWindows(NonMergingWindowFn):
               and self.offset == other.offset
               and self.period == other.period)
 
-  @staticmethod
-  def from_runner_api_parameter(fn_parameter, unused_context):
-    return SlidingWindows(
-        size=Duration(micros=fn_parameter['size']),
-        offset=Timestamp(micros=fn_parameter['offset']),
-        period=Duration(micros=fn_parameter['period']))
-
   def to_runner_api_parameter(self, context):
     return (urns.SLIDING_WINDOWS_FN,
             proto_utils.pack_Struct(
@@ -433,11 +382,12 @@ class SlidingWindows(NonMergingWindowFn):
                 offset=self.offset.micros,
                 period=self.period.micros))
 
-
-WindowFn.register_urn(
-    urns.SLIDING_WINDOWS_FN,
-    struct_pb2.Struct,
-    SlidingWindows.from_runner_api_parameter)
+  @urns.RunnerApiFn.register_urn(urns.SLIDING_WINDOWS_FN, struct_pb2.Struct)
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return SlidingWindows(
+        size=Duration(micros=fn_parameter['size']),
+        offset=Timestamp(micros=fn_parameter['offset']),
+        period=Duration(micros=fn_parameter['period']))
 
 
 class Sessions(WindowFn):
@@ -487,16 +437,10 @@ class Sessions(WindowFn):
     if type(self) == type(other) == Sessions:
       return self.gap_size == other.gap_size
 
-  @staticmethod
+  @urns.RunnerApiFn.register_urn(urns.SESSION_WINDOWS_FN, struct_pb2.Struct)
   def from_runner_api_parameter(fn_parameter, unused_context):
     return Sessions(gap_size=Duration(micros=fn_parameter['gap_size']))
 
   def to_runner_api_parameter(self, context):
     return (urns.SESSION_WINDOWS_FN,
             proto_utils.pack_Struct(gap_size=self.gap_size.micros))
-
-
-WindowFn.register_urn(
-    urns.SESSION_WINDOWS_FN,
-    struct_pb2.Struct,
-    Sessions.from_runner_api_parameter)

http://git-wip-us.apache.org/repos/asf/beam/blob/87a475e4/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index a2f3a3e..46bd8f5 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -15,6 +15,15 @@
 # limitations under the License.
 #
 
+import abc
+import inspect
+
+from google.protobuf import wrappers_pb2
+
+from apache_beam.internal import pickler
+from apache_beam.utils import proto_utils
+
+
 PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1"
 GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1"
 FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
@@ -26,3 +35,85 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
 FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"
 WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1"
+
+
+class RunnerApiFn(object):
+  """Abstract base class that provides urn registration utilities.
+
+  A class that inherits from this class will get a registration-based
+  from_runner_api and to_runner_api method that convert to and from
+  beam_runner_api_pb2.SdkFunctionSpec.
+
+  Additionally, register_pickle_urn can be called from the body of a class
+  to register serialization via pickling.
+  """
+
+  __metaclass__ = abc.ABCMeta
+
+  _known_urns = {}
+
+  @abc.abstractmethod
+  def to_runner_api_parameter(self, unused_context):
+    """Returns the urn and payload for this Fn.
+
+    The returned urn(s) should be registered with `register_urn`.
+    """
+    pass
+
+  @classmethod
+  def register_urn(cls, urn, parameter_type, fn=None):
+    """Registeres a urn with a constructor.
+
+    For example, if 'beam:fn:foo' had paramter type FooPayload, one could
+    write `RunnerApiFn.register_urn('bean:fn:foo', FooPayload, foo_from_proto)`
+    where foo_from_proto took as arguments a FooPayload and a PipelineContext.
+    This function can also be used as a decorator rather than passing the
+    callable in as the final parameter.
+
+    A corresponding to_runner_api_parameter method would be expected that
+    returns the tuple ('beam:fn:foo', FooPayload)
+    """
+    def register(fn):
+      cls._known_urns[urn] = parameter_type, fn
+      return staticmethod(fn)
+    if fn:
+      # Used as a statement.
+      register(fn)
+    else:
+      # Used as a decorator.
+      return register
+
+  @classmethod
+  def register_pickle_urn(cls, pickle_urn):
+    """Registers and implements the given urn via pickling.
+    """
+    inspect.currentframe().f_back.f_locals['to_runner_api_parameter'] = (
+        lambda self, context: (
+            pickle_urn, wrappers_pb2.BytesValue(value=pickler.dumps(self))))
+    cls.register_urn(
+        pickle_urn,
+        wrappers_pb2.BytesValue,
+        lambda proto, unused_context: pickler.loads(proto.value))
+
+  def to_runner_api(self, context):
+    """Returns an SdkFunctionSpec encoding this Fn.
+
+    Prefer overriding self.to_runner_api_parameter.
+    """
+    from apache_beam.runners.api import beam_runner_api_pb2
+    urn, typed_param = self.to_runner_api_parameter(context)
+    return beam_runner_api_pb2.SdkFunctionSpec(
+        spec=beam_runner_api_pb2.FunctionSpec(
+            urn=urn,
+            parameter=proto_utils.pack_Any(typed_param)))
+
+  @classmethod
+  def from_runner_api(cls, fn_proto, context):
+    """Converts from an SdkFunctionSpec to a Fn object.
+
+    Prefer registering a urn with its parameter type and constructor.
+    """
+    parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
+    return constructor(
+        proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
+        context)


[5/5] beam git commit: Closes #2595

Posted by ro...@apache.org.
Closes #2595


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/412b610e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/412b610e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/412b610e

Branch: refs/heads/master
Commit: 412b610edbde16e6f3ce1ae7b0178cb17854f954
Parents: 26c61f4 87a475e
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Apr 26 18:14:14 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Apr 26 18:14:14 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py             | 18 ++--
 sdks/python/apache_beam/transforms/core.py      | 47 ++++++++--
 .../python/apache_beam/transforms/ptransform.py | 40 +++++++++
 sdks/python/apache_beam/transforms/window.py    | 90 ++++---------------
 sdks/python/apache_beam/utils/urns.py           | 93 ++++++++++++++++++++
 5 files changed, 199 insertions(+), 89 deletions(-)
----------------------------------------------------------------------



[2/5] beam git commit: Per-transform runner api dispatch.

Posted by ro...@apache.org.
Per-transform runner api dispatch.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97c9b174
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97c9b174
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97c9b174

Branch: refs/heads/master
Commit: 97c9b174d027e58ac5202cc1eedeaec59b57023a
Parents: 26c61f4
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 18 15:29:04 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Apr 26 18:14:12 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py             | 18 ++++-----
 .../python/apache_beam/transforms/ptransform.py | 40 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/97c9b174/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 0f4c8db..100c50a 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,7 +52,6 @@ import os
 import shutil
 import tempfile
 
-from google.protobuf import wrappers_pb2
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.internal import pickler
@@ -60,8 +59,6 @@ from apache_beam.runners import create_runner
 from apache_beam.runners import PipelineRunner
 from apache_beam.transforms import ptransform
 from apache_beam.typehints import TypeCheckError
-from apache_beam.utils import proto_utils
-from apache_beam.utils import urns
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 from apache_beam.utils.pipeline_options import StandardOptions
@@ -514,12 +511,15 @@ class AppliedPTransform(object):
 
   def to_runner_api(self, context):
     from apache_beam.runners.api import beam_runner_api_pb2
+
+    def transform_to_runner_api(transform, context):
+      if transform is None:
+        return None
+      else:
+        return transform.to_runner_api(context)
     return beam_runner_api_pb2.PTransform(
         unique_name=self.full_label,
-        spec=beam_runner_api_pb2.FunctionSpec(
-            urn=urns.PICKLED_TRANSFORM,
-            parameter=proto_utils.pack_Any(
-                wrappers_pb2.BytesValue(value=pickler.dumps(self.transform)))),
+        spec=transform_to_runner_api(self.transform, context),
         subtransforms=[context.transforms.get_id(part) for part in self.parts],
         # TODO(BEAM-115): Side inputs.
         inputs={tag: context.pcollections.get_id(pc)
@@ -533,9 +533,7 @@ class AppliedPTransform(object):
   def from_runner_api(proto, context):
     result = AppliedPTransform(
         parent=None,
-        transform=pickler.loads(
-            proto_utils.unpack_Any(proto.spec.parameter,
-                                   wrappers_pb2.BytesValue).value),
+        transform=ptransform.PTransform.from_runner_api(proto.spec, context),
         full_label=proto.unique_name,
         inputs=[
             context.pcollections.get_by_id(id) for id in proto.inputs.values()])

http://git-wip-us.apache.org/repos/asf/beam/blob/97c9b174/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index e2c4428..706b003 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -42,6 +42,8 @@ import operator
 import os
 import sys
 
+from google.protobuf import wrappers_pb2
+
 from apache_beam import error
 from apache_beam import pvalue
 from apache_beam import typehints
@@ -54,6 +56,8 @@ from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints import validate_composite_type_param
 from apache_beam.typehints import WithTypeHints
 from apache_beam.typehints.trivial_inference import instance_to_type
+from apache_beam.utils import proto_utils
+from apache_beam.utils import urns
 
 
 class _PValueishTransform(object):
@@ -412,6 +416,42 @@ class PTransform(WithTypeHints, HasDisplayData):
         yield pvalueish
     return pvalueish, tuple(_dict_tuple_leaves(pvalueish))
 
+  _known_urns = {}
+
+  @classmethod
+  def register_urn(cls, urn, parameter_type, constructor):
+    cls._known_urns[urn] = parameter_type, constructor
+
+  def to_runner_api(self, context):
+    from apache_beam.runners.api import beam_runner_api_pb2
+    urn, typed_param = self.to_runner_api_parameter(context)
+    return beam_runner_api_pb2.FunctionSpec(
+        urn=urn,
+        parameter=proto_utils.pack_Any(typed_param))
+
+  @classmethod
+  def from_runner_api(cls, proto, context):
+    if proto is None or not proto.urn:
+      return None
+    parameter_type, constructor = cls._known_urns[proto.urn]
+    return constructor(
+        proto_utils.unpack_Any(proto.parameter, parameter_type),
+        context)
+
+  def to_runner_api_parameter(self, context):
+    return (urns.PICKLED_TRANSFORM,
+            wrappers_pb2.BytesValue(value=pickler.dumps(self)))
+
+  @staticmethod
+  def from_runner_api_parameter(spec_parameter, unused_context):
+    return pickler.loads(spec_parameter.value)
+
+
+PTransform.register_urn(
+    urns.PICKLED_TRANSFORM,
+    wrappers_pb2.BytesValue,
+    PTransform.from_runner_api_parameter)
+
 
 class ChainedPTransform(PTransform):
 


[3/5] beam git commit: Translate WindowInto through the Runner API.

Posted by ro...@apache.org.
Translate WindowInto through the Runner API.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a95fc199
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a95fc199
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a95fc199

Branch: refs/heads/master
Commit: a95fc199fb7daa7a5e7dd2be7d1eda11748b0e6b
Parents: d096b19
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Apr 18 16:07:32 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Apr 26 18:14:13 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/core.py | 35 ++++++++++++++++++++-----
 sdks/python/apache_beam/utils/urns.py      |  1 +
 2 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a95fc199/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 14cc620..64911d6 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -590,6 +590,9 @@ class ParDo(PTransformWithSideInputs):
 
   def __init__(self, fn, *args, **kwargs):
     super(ParDo, self).__init__(fn, *args, **kwargs)
+    # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
+    self.dofn = self.fn
+    self.output_tags = set()
 
     if not isinstance(self.fn, DoFn):
       raise TypeError('ParDo must be called with a DoFn instance.')
@@ -615,9 +618,6 @@ class ParDo(PTransformWithSideInputs):
             'fn_dd': self.fn}
 
   def expand(self, pcoll):
-    self.output_tags = set()
-    # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
-    self.dofn = self.fn
     return pvalue.PCollection(pcoll.pipeline)
 
   def with_outputs(self, *tags, **main_kw):
@@ -1268,7 +1268,7 @@ class WindowInto(ParDo):
       new_windows = self.windowing.windowfn.assign(context)
       yield WindowedValue(element, context.timestamp, new_windows)
 
-  def __init__(self, windowfn, *args, **kwargs):
+  def __init__(self, windowfn, **kwargs):
     """Initializes a WindowInto transform.
 
     Args:
@@ -1279,8 +1279,7 @@ class WindowInto(ParDo):
     output_time_fn = kwargs.pop('output_time_fn', None)
     self.windowing = Windowing(windowfn, triggerfn, accumulation_mode,
                                output_time_fn)
-    dofn = self.WindowIntoFn(self.windowing)
-    super(WindowInto, self).__init__(dofn)
+    super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing))
 
   def get_windowing(self, unused_inputs):
     return self.windowing
@@ -1297,6 +1296,30 @@ class WindowInto(ParDo):
       self.with_output_types(output_type)
     return super(WindowInto, self).expand(pcoll)
 
+  def to_runner_api_parameter(self, context):
+    return (
+        urns.WINDOW_INTO_TRANSFORM,
+        self.windowing.to_runner_api(context))
+
+  @staticmethod
+  def from_runner_api_parameter(proto, context):
+    windowing = Windowing.from_runner_api(proto, context)
+    return WindowInto(
+        windowing.windowfn,
+        trigger=windowing.triggerfn,
+        accumulation_mode=windowing.accumulation_mode,
+        output_time_fn=windowing.output_time_fn)
+
+
+PTransform.register_urn(
+    urns.WINDOW_INTO_TRANSFORM,
+    # TODO(robertwb): Update WindowIntoPayload to include the full strategy.
+    # (Right now only WindowFn is used, but we need this to reconstitute the
+    # WindowInto transform, and in the future will need it at runtime to
+    # support meta-data driven triggers.)
+    beam_runner_api_pb2.WindowingStrategy,
+    WindowInto.from_runner_api_parameter)
+
 
 # Python's pickling is broken for nested classes.
 WindowIntoFn = WindowInto.WindowIntoFn

http://git-wip-us.apache.org/repos/asf/beam/blob/a95fc199/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index d10dd26..a2f3a3e 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -25,3 +25,4 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 
 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
 FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"
+WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1"