You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/24 22:42:14 UTC

[2/3] beam git commit: Runner API encoding of common coders.

Runner API encoding of common coders.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9cc004fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9cc004fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9cc004fb

Branch: refs/heads/master
Commit: 9cc004fb0c32234b541cd622a0d0ab4c5c3d2389
Parents: ef4239a
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Aug 22 10:54:21 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Aug 24 15:41:46 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coders.py        | 42 ++++++++++++++++++--
 .../apache_beam/coders/coders_test_common.py    |  4 +-
 sdks/python/apache_beam/utils/urns.py           | 11 ++++-
 3 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9cc004fb/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 0ea5f7c..e204369 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -206,9 +206,9 @@ class Coder(object):
 
   @classmethod
   def register_urn(cls, urn, parameter_type, fn=None):
-    """Registeres a urn with a constructor.
+    """Registers a urn with a constructor.
 
-    For example, if 'beam:fn:foo' had paramter type FooPayload, one could
+    For example, if 'beam:fn:foo' had parameter type FooPayload, one could
     write `RunnerApiFn.register_urn('bean:fn:foo', FooPayload, foo_from_proto)`
     where foo_from_proto took as arguments a FooPayload and a PipelineContext.
     This function can also be used as a decorator rather than passing the
@@ -228,7 +228,6 @@ class Coder(object):
       return register
 
   def to_runner_api(self, context):
-    from apache_beam.portability.api import beam_runner_api_pb2
     urn, typed_param, components = self.to_runner_api_parameter(context)
     return beam_runner_api_pb2.Coder(
         spec=beam_runner_api_pb2.SdkFunctionSpec(
@@ -257,6 +256,22 @@ class Coder(object):
         google.protobuf.wrappers_pb2.BytesValue(value=serialize_coder(self)),
         ())
 
+  @staticmethod
+  def register_structured_urn(urn, cls):
+    """Register a coder that's completely defined by its urn and its
+    component(s), if any, which are passed to construct the instance.
+    """
+    cls.to_runner_api_parameter = (
+        lambda self, unused_context: (urn, None, self._get_component_coders()))
+
+    # pylint: disable=unused-variable
+    @Coder.register_urn(urn, None)
+    def from_runner_api_parameter(unused_payload, components, unused_context):
+      if components:
+        return cls(*components)
+      else:
+        return cls()
+
 
 @Coder.register_urn(urns.PICKLED_CODER, google.protobuf.wrappers_pb2.BytesValue)
 def _pickle_from_runner_api_parameter(payload, components, context):
@@ -337,6 +352,9 @@ class BytesCoder(FastCoder):
     return hash(type(self))
 
 
+Coder.register_structured_urn(urns.BYTES_CODER, BytesCoder)
+
+
 class VarIntCoder(FastCoder):
   """Variable-length integer coder."""
 
@@ -353,6 +371,9 @@ class VarIntCoder(FastCoder):
     return hash(type(self))
 
 
+Coder.register_structured_urn(urns.VAR_INT_CODER, VarIntCoder)
+
+
 class FloatCoder(FastCoder):
   """A coder used for floating-point values."""
 
@@ -757,6 +778,9 @@ class IterableCoder(FastCoder):
     return hash((type(self), self._elem_coder))
 
 
+Coder.register_structured_urn(urns.ITERABLE_CODER, IterableCoder)
+
+
 class GlobalWindowCoder(SingletonCoder):
   """Coder for global windows."""
 
@@ -770,6 +794,9 @@ class GlobalWindowCoder(SingletonCoder):
     }
 
 
+Coder.register_structured_urn(urns.GLOBAL_WINDOW_CODER, GlobalWindowCoder)
+
+
 class IntervalWindowCoder(FastCoder):
   """Coder for an window defined by a start timestamp and a duration."""
 
@@ -791,6 +818,9 @@ class IntervalWindowCoder(FastCoder):
     return hash(type(self))
 
 
+Coder.register_structured_urn(urns.INTERVAL_WINDOW_CODER, IntervalWindowCoder)
+
+
 class WindowedValueCoder(FastCoder):
   """Coder for windowed values."""
 
@@ -847,6 +877,9 @@ class WindowedValueCoder(FastCoder):
         (self.wrapped_value_coder, self.timestamp_coder, self.window_coder))
 
 
+Coder.register_structured_urn(urns.WINDOWED_VALUE_CODER, WindowedValueCoder)
+
+
 class LengthPrefixCoder(FastCoder):
   """For internal use only; no backwards-compatibility guarantees.
 
@@ -886,3 +919,6 @@ class LengthPrefixCoder(FastCoder):
 
   def __hash__(self):
     return hash((type(self), self._value_coder))
+
+
+Coder.register_structured_urn(urns.LENGTH_PREFIX_CODER, LengthPrefixCoder)

http://git-wip-us.apache.org/repos/asf/beam/blob/9cc004fb/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 577c53a..8b0353d 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -26,6 +26,7 @@ import dill
 from apache_beam.transforms.window import GlobalWindow
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 import observable
+from apache_beam.runners import pipeline_context
 from apache_beam.transforms import window
 from apache_beam.utils import timestamp
 from apache_beam.utils import windowed_value
@@ -90,7 +91,8 @@ class CodersTest(unittest.TestCase):
       self.assertEqual(coder.get_impl().get_estimated_size_and_observables(v),
                        (coder.get_impl().estimate_size(v), []))
     copy1 = dill.loads(dill.dumps(coder))
-    copy2 = dill.loads(dill.dumps(coder))
+    context = pipeline_context.PipelineContext()
+    copy2 = coders.Coder.from_runner_api(coder.to_runner_api(context), context)
     for v in values:
       self.assertEqual(v, copy1.decode(copy2.encode(v)))
       if coder.is_deterministic():

http://git-wip-us.apache.org/repos/asf/beam/blob/9cc004fb/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index acf729f..18959be 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -35,7 +35,6 @@ SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"
 PICKLED_DO_FN = "beam:dofn:pickled_python:v0.1"
 PICKLED_DO_FN_INFO = "beam:dofn:pickled_python_info:v0.1"
 PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v0.1"
-PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 
 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
 PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
@@ -50,6 +49,16 @@ WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1"
 
 PICKLED_SOURCE = "beam:source:pickled_python:v0.1"
 
+PICKLED_CODER = "beam:coder:pickled_python:v0.1"
+BYTES_CODER = "urn:beam:coders:bytes:0.1"
+VAR_INT_CODER = "urn:beam:coders:varint:0.1"
+INTERVAL_WINDOW_CODER = "urn:beam:coders:interval_window:0.1"
+ITERABLE_CODER = "urn:beam:coders:stream:0.1"
+KV_CODER = "urn:beam:coders:kv:0.1"
+LENGTH_PREFIX_CODER = "urn:beam:coders:length_prefix:0.1"
+GLOBAL_WINDOW_CODER = "urn:beam:coders:urn:beam:coders:global_window:0.1"
+WINDOWED_VALUE_CODER = "urn:beam:coders:windowed_value:0.1"
+
 
 class RunnerApiFn(object):
   """Abstract base class that provides urn registration utilities.