You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/03/17 21:46:44 UTC

[13/50] [abbrv] beam git commit: Move pipeline context and add more tests.

Move pipeline context and add more tests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/deff128f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/deff128f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/deff128f

Branch: refs/heads/gearpump-runner
Commit: deff128ff07ccc67956e1d5a94a64f2a31b224c8
Parents: b2da21e
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Mar 9 09:21:33 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:02 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coders.py        | 93 ++++++++++++++++++++
 sdks/python/apache_beam/pipeline.py             | 62 -------------
 .../apache_beam/runners/pipeline_context.py     | 88 ++++++++++++++++++
 .../runners/pipeline_context_test.py            | 49 +++++++++++
 sdks/python/apache_beam/transforms/core.py      |  1 +
 .../apache_beam/transforms/trigger_test.py      | 18 +---
 sdks/python/apache_beam/transforms/window.py    |  2 +-
 .../apache_beam/transforms/window_test.py       |  6 +-
 sdks/python/apache_beam/utils/urns.py           |  2 +-
 9 files changed, 238 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index fd72af8..9f5a97a 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -266,6 +266,12 @@ class BytesCoder(FastCoder):
   def is_deterministic(self):
     return True
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class VarIntCoder(FastCoder):
   """Variable-length integer coder."""
@@ -276,6 +282,12 @@ class VarIntCoder(FastCoder):
   def is_deterministic(self):
     return True
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class FloatCoder(FastCoder):
   """A coder used for floating-point values."""
@@ -286,6 +298,12 @@ class FloatCoder(FastCoder):
   def is_deterministic(self):
     return True
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class TimestampCoder(FastCoder):
   """A coder used for timeutil.Timestamp values."""
@@ -296,6 +314,12 @@ class TimestampCoder(FastCoder):
   def is_deterministic(self):
     return True
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class SingletonCoder(FastCoder):
   """A coder that always encodes exactly one value."""
@@ -309,6 +333,12 @@ class SingletonCoder(FastCoder):
   def is_deterministic(self):
     return True
 
+  def __eq__(self, other):
+    return type(self) == type(other) and self._value == other._value
+
+  def __hash__(self):
+    return hash(self._value)
+
 
 def maybe_dill_dumps(o):
   """Pickle using cPickle or the Dill pickler as a fallback."""
@@ -365,6 +395,12 @@ class _PickleCoderBase(FastCoder):
   def value_coder(self):
     return self
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class PickleCoder(_PickleCoderBase):
   """Coder using Python's pickle functionality."""
@@ -446,6 +482,12 @@ class FastPrimitivesCoder(FastCoder):
   def value_coder(self):
     return self
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class Base64PickleCoder(Coder):
   """Coder of objects by Python pickle, then base64 encoding."""
@@ -503,6 +545,13 @@ class ProtoCoder(FastCoder):
     # a Map.
     return False
 
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self.proto_message_type == other.proto_message_type)
+
+  def __hash__(self):
+    return hash(self.proto_message_type)
+
   @staticmethod
   def from_type_hint(typehint, unused_registry):
     if issubclass(typehint, google.protobuf.message.Message):
@@ -563,6 +612,13 @@ class TupleCoder(FastCoder):
   def __repr__(self):
     return 'TupleCoder[%s]' % ', '.join(str(c) for c in self._coders)
 
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self._coders == self._coders)
+
+  def __hash__(self):
+    return hash(self._coders)
+
 
 class TupleSequenceCoder(FastCoder):
   """Coder of homogeneous tuple objects."""
@@ -586,6 +642,13 @@ class TupleSequenceCoder(FastCoder):
   def __repr__(self):
     return 'TupleSequenceCoder[%r]' % self._elem_coder
 
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self._elem_coder == self._elem_coder)
+
+  def __hash__(self):
+    return hash((type(self), self._elem_coder))
+
 
 class IterableCoder(FastCoder):
   """Coder of iterables of homogeneous objects."""
@@ -619,6 +682,13 @@ class IterableCoder(FastCoder):
   def __repr__(self):
     return 'IterableCoder[%r]' % self._elem_coder
 
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self._elem_coder == self._elem_coder)
+
+  def __hash__(self):
+    return hash((type(self), self._elem_coder))
+
 
 class WindowCoder(PickleCoder):
   """Coder for windows in windowed values."""
@@ -663,6 +733,12 @@ class IntervalWindowCoder(FastCoder):
         '@type': 'kind:interval_window',
     }
 
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
 
 class WindowedValueCoder(FastCoder):
   """Coder for windowed values."""
@@ -709,6 +785,16 @@ class WindowedValueCoder(FastCoder):
   def __repr__(self):
     return 'WindowedValueCoder[%s]' % self.wrapped_value_coder
 
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self.wrapped_value_coder == other.wrapped_value_coder
+            and self.timestamp_coder == other.timestamp_coder
+            and self.window_coder == other.window_coder)
+
+  def __hash__(self):
+    return hash(
+        (self.wrapped_value_coder, self.timestamp_coder, self.window_coder))
+
 
 class LengthPrefixCoder(FastCoder):
   """Coder which prefixes the length of the encoded object in the stream."""
@@ -740,3 +826,10 @@ class LengthPrefixCoder(FastCoder):
 
   def __repr__(self):
     return 'LengthPrefixCoder[%r]' % self._value_coder
+
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self._value_coder == other._value_coder)
+
+  def __hash__(self):
+    return hash((type(self), self._value_coder))

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 9edcf9b..7db39a9 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,14 +52,11 @@ import os
 import shutil
 import tempfile
 
-from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.internal import pickler
 from apache_beam.runners import create_runner
 from apache_beam.runners import PipelineRunner
-from apache_beam.runners.api import beam_runner_api_pb2
-from apache_beam.transforms import core
 from apache_beam.transforms import ptransform
 from apache_beam.typehints import TypeCheckError
 from apache_beam.utils.pipeline_options import PipelineOptions
@@ -443,62 +440,3 @@ class AppliedPTransform(object):
         if v not in visited:
           visited.add(v)
           visitor.visit_value(v, self)
-
-
-class PipelineContextMap(object):
-  """This is a bi-directional map between objects and ids.
-
-  Under the hood it encodes and decodes these objects into runner API
-  representations.
-  """
-  def __init__(self, context, obj_type, proto_map=None):
-    self._pipeline_context = context
-    self._obj_type = obj_type
-    self._obj_to_id = {}
-    self._id_to_obj = {}
-    self._id_to_proto = proto_map if proto_map else {}
-    self._counter = 0
-
-  def _unique_ref(self):
-    self._counter += 1
-    return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
-
-  def populate_map(self, proto_map):
-    for id, obj in self._id_to_obj:
-      proto_map[id] = self._id_to_proto[id]
-
-  def get_id(self, obj):
-    if obj not in self._obj_to_id:
-      id = self._unique_ref()
-      self._id_to_obj[id] = obj
-      self._obj_to_id[obj] = id
-      self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
-    return self._obj_to_id[obj]
-
-  def get_by_id(self, id):
-    if id not in self._id_to_obj:
-      self._id_to_obj[id] = self._obj_type.from_runner_api(
-        self._id_to_proto[id], self._pipeline_context)
-    return self._id_to_obj[id]
-
-
-class PipelineContext(object):
-
-  _COMPONENT_TYPES = {
-    'transforms': AppliedPTransform,
-    'pcollections': pvalue.PCollection,
-    'coders': coders.Coder,
-    'windowing_strategies': core.Windowing,
-    # TODO: environment
-  }
-
-  def __init__(self, context_proto=None):
-    for name, cls in self._COMPONENT_TYPES.items():
-      setattr(self, name,
-              PipelineContextMap(self, cls, getattr(context_proto, name, None)))
-
-  def to_runner_api(self):
-    context_proto = beam_runner_api_pb2.Components()
-    for name, cls in self._COMPONENT_TYPES:
-      getattr(self, name).populate_map(getattr(context_proto, name))
-    return context_proto

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
new file mode 100644
index 0000000..4f82774
--- /dev/null
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from apache_beam import pipeline
+from apache_beam import pvalue
+from apache_beam import coders
+from apache_beam.runners.api import beam_runner_api_pb2
+from apache_beam.transforms import core
+
+
+class _PipelineContextMap(object):
+  """This is a bi-directional map between objects and ids.
+
+  Under the hood it encodes and decodes these objects into runner API
+  representations.
+  """
+  def __init__(self, context, obj_type, proto_map=None):
+    self._pipeline_context = context
+    self._obj_type = obj_type
+    self._obj_to_id = {}
+    self._id_to_obj = {}
+    self._id_to_proto = proto_map if proto_map else {}
+    self._counter = 0
+
+  def _unique_ref(self):
+    self._counter += 1
+    return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
+
+  def populate_map(self, proto_map):
+    for id, proto in self._id_to_proto.items():
+      proto_map[id].CopyFrom(proto)
+
+  def get_id(self, obj):
+    if obj not in self._obj_to_id:
+      id = self._unique_ref()
+      self._id_to_obj[id] = obj
+      self._obj_to_id[obj] = id
+      self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
+    return self._obj_to_id[obj]
+
+  def get_by_id(self, id):
+    if id not in self._id_to_obj:
+      self._id_to_obj[id] = self._obj_type.from_runner_api(
+          self._id_to_proto[id], self._pipeline_context)
+    return self._id_to_obj[id]
+
+
+class PipelineContext(object):
+  """Used for accessing and constructing the referenced objects of a Pipeline.
+  """
+
+  _COMPONENT_TYPES = {
+      'transforms': pipeline.AppliedPTransform,
+      'pcollections': pvalue.PCollection,
+      'coders': coders.Coder,
+      'windowing_strategies': core.Windowing,
+      # TODO: environment
+  }
+
+  def __init__(self, context_proto=None):
+    for name, cls in self._COMPONENT_TYPES.items():
+      setattr(
+          self, name, _PipelineContextMap(
+              self, cls, getattr(context_proto, name, None)))
+
+  @staticmethod
+  def from_runner_api(proto):
+    return PipelineContext(proto)
+
+  def to_runner_api(self):
+    context_proto = beam_runner_api_pb2.Components()
+    for name in self._COMPONENT_TYPES:
+      getattr(self, name).populate_map(getattr(context_proto, name))
+    return context_proto

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/runners/pipeline_context_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py b/sdks/python/apache_beam/runners/pipeline_context_test.py
new file mode 100644
index 0000000..6091ed8
--- /dev/null
+++ b/sdks/python/apache_beam/runners/pipeline_context_test.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the windowing classes."""
+
+import unittest
+
+from apache_beam import coders
+from apache_beam.runners import pipeline_context
+
+
+class PipelineContextTest(unittest.TestCase):
+
+  def test_deduplication(self):
+    context = pipeline_context.PipelineContext()
+    bytes_coder_ref = context.coders.get_id(coders.BytesCoder())
+    bytes_coder_ref2 = context.coders.get_id(coders.BytesCoder())
+    self.assertEqual(bytes_coder_ref, bytes_coder_ref2)
+
+  def test_serialization(self):
+    context = pipeline_context.PipelineContext()
+    float_coder_ref = context.coders.get_id(coders.FloatCoder())
+    bytes_coder_ref = context.coders.get_id(coders.BytesCoder())
+    proto = context.to_runner_api()
+    context2 = pipeline_context.PipelineContext.from_runner_api(proto)
+    self.assertEqual(
+        coders.FloatCoder(),
+        context2.coders.get_by_id(float_coder_ref))
+    self.assertEqual(
+        coders.BytesCoder(),
+        context2.coders.get_by_id(bytes_coder_ref))
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 1fc63b2..3251671 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1235,6 +1235,7 @@ class Windowing(object):
         trigger=self.triggerfn.to_runner_api(context),
         accumulation_mode=self.accumulation_mode,
         output_time=self.output_time_fn,
+        # TODO(robertwb): Support EMIT_IF_NONEMPTY
         closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
         allowed_lateness=0)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index cc9e0f5..827aa33 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -25,6 +25,7 @@ import unittest
 import yaml
 
 import apache_beam as beam
+from apache_beam.runners import pipeline_context
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import trigger
 from apache_beam.transforms.core import Windowing
@@ -392,22 +393,7 @@ class RunnerApiTest(unittest.TestCase):
         AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
         Repeatedly(AfterCount(100)),
         trigger.OrFinally(AfterCount(3), AfterCount(10))):
-      context = beam.pipeline.PipelineContext()
-      self.assertEqual(
-          trigger_fn,
-          TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
-
-  def test_windowing_strategy_encoding(self):
-    for trigger_fn in (
-        DefaultTrigger(),
-        AfterAll(AfterCount(1), AfterCount(10)),
-        AfterFirst(AfterCount(10), AfterCount(100)),
-        AfterEach(AfterCount(100), AfterCount(1000)),
-        AfterWatermark(early=AfterCount(1000)),
-        AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
-        Repeatedly(AfterCount(100)),
-        trigger.OrFinally(AfterCount(3), AfterCount(10))):
-      context = beam.pipeline.PipelineContext()
+      context = pipeline_context.PipelineContext()
       self.assertEqual(
           trigger_fn,
           TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index c763a96..3878dff 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -73,6 +73,7 @@ class OutputTimeFn(object):
   OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW
   OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE
   OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE
+  # TODO(robertwb): Add this to the runner API or remove it.
   OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
 
   @staticmethod
@@ -167,7 +168,6 @@ class WindowFn(object):
     return pickler.loads(fn_parameter.value)
 
   def to_runner_api_parameter(self, context):
-    raise TypeError(self)
     return (urns.PICKLED_WINDOW_FN,
             wrappers_pb2.BytesValue(value=pickler.dumps(self)))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index c79739a..99be02c 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -19,7 +19,7 @@
 
 import unittest
 
-from apache_beam import pipeline
+from apache_beam.runners import pipeline_context
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import CombinePerKey
 from apache_beam.transforms import combiners
@@ -238,7 +238,7 @@ class RunnerApiTest(unittest.TestCase):
                       FixedWindows(37),
                       SlidingWindows(2, 389),
                       Sessions(5077)):
-      context = pipeline.PipelineContext()
+      context = pipeline_context.PipelineContext()
       self.assertEqual(
           window_fn,
           WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
@@ -251,7 +251,7 @@ class RunnerApiTest(unittest.TestCase):
         Windowing(SlidingWindows(10, 15, 21), AfterCount(28),
                   output_time_fn=OutputTimeFn.OUTPUT_AT_LATEST,
                   accumulation_mode=AccumulationMode.DISCARDING)):
-      context = pipeline.PipelineContext()
+      context = pipeline_context.PipelineContext()
       self.assertEqual(
           windowing,
           Windowing.from_runner_api(windowing.to_runner_api(context), context))

http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 186c99c..936e2cb 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -21,4 +21,4 @@ FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
 SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
 SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
 
-PICKLED_CODER = "dataflow:coder:pickled_python:v0.1"
+PICKLED_CODER = "beam:coder:pickled_python:v0.1"