You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/03/17 21:46:44 UTC
[13/50] [abbrv] beam git commit: Move pipeline context and add more
tests.
Move pipeline context and add more tests.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/deff128f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/deff128f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/deff128f
Branch: refs/heads/gearpump-runner
Commit: deff128ff07ccc67956e1d5a94a64f2a31b224c8
Parents: b2da21e
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Mar 9 09:21:33 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Mar 9 20:29:02 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coders.py | 93 ++++++++++++++++++++
sdks/python/apache_beam/pipeline.py | 62 -------------
.../apache_beam/runners/pipeline_context.py | 88 ++++++++++++++++++
.../runners/pipeline_context_test.py | 49 +++++++++++
sdks/python/apache_beam/transforms/core.py | 1 +
.../apache_beam/transforms/trigger_test.py | 18 +---
sdks/python/apache_beam/transforms/window.py | 2 +-
.../apache_beam/transforms/window_test.py | 6 +-
sdks/python/apache_beam/utils/urns.py | 2 +-
9 files changed, 238 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index fd72af8..9f5a97a 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -266,6 +266,12 @@ class BytesCoder(FastCoder):
def is_deterministic(self):
return True
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class VarIntCoder(FastCoder):
"""Variable-length integer coder."""
@@ -276,6 +282,12 @@ class VarIntCoder(FastCoder):
def is_deterministic(self):
return True
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class FloatCoder(FastCoder):
"""A coder used for floating-point values."""
@@ -286,6 +298,12 @@ class FloatCoder(FastCoder):
def is_deterministic(self):
return True
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class TimestampCoder(FastCoder):
"""A coder used for timeutil.Timestamp values."""
@@ -296,6 +314,12 @@ class TimestampCoder(FastCoder):
def is_deterministic(self):
return True
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class SingletonCoder(FastCoder):
"""A coder that always encodes exactly one value."""
@@ -309,6 +333,12 @@ class SingletonCoder(FastCoder):
def is_deterministic(self):
return True
+ def __eq__(self, other):
+ return type(self) == type(other) and self._value == other._value
+
+ def __hash__(self):
+ return hash(self._value)
+
def maybe_dill_dumps(o):
"""Pickle using cPickle or the Dill pickler as a fallback."""
@@ -365,6 +395,12 @@ class _PickleCoderBase(FastCoder):
def value_coder(self):
return self
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class PickleCoder(_PickleCoderBase):
"""Coder using Python's pickle functionality."""
@@ -446,6 +482,12 @@ class FastPrimitivesCoder(FastCoder):
def value_coder(self):
return self
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class Base64PickleCoder(Coder):
"""Coder of objects by Python pickle, then base64 encoding."""
@@ -503,6 +545,13 @@ class ProtoCoder(FastCoder):
# a Map.
return False
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self.proto_message_type == other.proto_message_type)
+
+ def __hash__(self):
+ return hash(self.proto_message_type)
+
@staticmethod
def from_type_hint(typehint, unused_registry):
if issubclass(typehint, google.protobuf.message.Message):
@@ -563,6 +612,13 @@ class TupleCoder(FastCoder):
def __repr__(self):
return 'TupleCoder[%s]' % ', '.join(str(c) for c in self._coders)
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self._coders == self._coders)
+
+ def __hash__(self):
+ return hash(self._coders)
+
class TupleSequenceCoder(FastCoder):
"""Coder of homogeneous tuple objects."""
@@ -586,6 +642,13 @@ class TupleSequenceCoder(FastCoder):
def __repr__(self):
return 'TupleSequenceCoder[%r]' % self._elem_coder
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self._elem_coder == self._elem_coder)
+
+ def __hash__(self):
+ return hash((type(self), self._elem_coder))
+
class IterableCoder(FastCoder):
"""Coder of iterables of homogeneous objects."""
@@ -619,6 +682,13 @@ class IterableCoder(FastCoder):
def __repr__(self):
return 'IterableCoder[%r]' % self._elem_coder
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self._elem_coder == self._elem_coder)
+
+ def __hash__(self):
+ return hash((type(self), self._elem_coder))
+
class WindowCoder(PickleCoder):
"""Coder for windows in windowed values."""
@@ -663,6 +733,12 @@ class IntervalWindowCoder(FastCoder):
'@type': 'kind:interval_window',
}
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
class WindowedValueCoder(FastCoder):
"""Coder for windowed values."""
@@ -709,6 +785,16 @@ class WindowedValueCoder(FastCoder):
def __repr__(self):
return 'WindowedValueCoder[%s]' % self.wrapped_value_coder
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self.wrapped_value_coder == other.wrapped_value_coder
+ and self.timestamp_coder == other.timestamp_coder
+ and self.window_coder == other.window_coder)
+
+ def __hash__(self):
+ return hash(
+ (self.wrapped_value_coder, self.timestamp_coder, self.window_coder))
+
class LengthPrefixCoder(FastCoder):
"""Coder which prefixes the length of the encoded object in the stream."""
@@ -740,3 +826,10 @@ class LengthPrefixCoder(FastCoder):
def __repr__(self):
return 'LengthPrefixCoder[%r]' % self._value_coder
+
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self._value_coder == other._value_coder)
+
+ def __hash__(self):
+ return hash((type(self), self._value_coder))
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 9edcf9b..7db39a9 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -52,14 +52,11 @@ import os
import shutil
import tempfile
-from apache_beam import coders
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.internal import pickler
from apache_beam.runners import create_runner
from apache_beam.runners import PipelineRunner
-from apache_beam.runners.api import beam_runner_api_pb2
-from apache_beam.transforms import core
from apache_beam.transforms import ptransform
from apache_beam.typehints import TypeCheckError
from apache_beam.utils.pipeline_options import PipelineOptions
@@ -443,62 +440,3 @@ class AppliedPTransform(object):
if v not in visited:
visited.add(v)
visitor.visit_value(v, self)
-
-
-class PipelineContextMap(object):
- """This is a bi-directional map between objects and ids.
-
- Under the hood it encodes and decodes these objects into runner API
- representations.
- """
- def __init__(self, context, obj_type, proto_map=None):
- self._pipeline_context = context
- self._obj_type = obj_type
- self._obj_to_id = {}
- self._id_to_obj = {}
- self._id_to_proto = proto_map if proto_map else {}
- self._counter = 0
-
- def _unique_ref(self):
- self._counter += 1
- return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
-
- def populate_map(self, proto_map):
- for id, obj in self._id_to_obj:
- proto_map[id] = self._id_to_proto[id]
-
- def get_id(self, obj):
- if obj not in self._obj_to_id:
- id = self._unique_ref()
- self._id_to_obj[id] = obj
- self._obj_to_id[obj] = id
- self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
- return self._obj_to_id[obj]
-
- def get_by_id(self, id):
- if id not in self._id_to_obj:
- self._id_to_obj[id] = self._obj_type.from_runner_api(
- self._id_to_proto[id], self._pipeline_context)
- return self._id_to_obj[id]
-
-
-class PipelineContext(object):
-
- _COMPONENT_TYPES = {
- 'transforms': AppliedPTransform,
- 'pcollections': pvalue.PCollection,
- 'coders': coders.Coder,
- 'windowing_strategies': core.Windowing,
- # TODO: environment
- }
-
- def __init__(self, context_proto=None):
- for name, cls in self._COMPONENT_TYPES.items():
- setattr(self, name,
- PipelineContextMap(self, cls, getattr(context_proto, name, None)))
-
- def to_runner_api(self):
- context_proto = beam_runner_api_pb2.Components()
- for name, cls in self._COMPONENT_TYPES:
- getattr(self, name).populate_map(getattr(context_proto, name))
- return context_proto
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
new file mode 100644
index 0000000..4f82774
--- /dev/null
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from apache_beam import pipeline
+from apache_beam import pvalue
+from apache_beam import coders
+from apache_beam.runners.api import beam_runner_api_pb2
+from apache_beam.transforms import core
+
+
+class _PipelineContextMap(object):
+ """This is a bi-directional map between objects and ids.
+
+ Under the hood it encodes and decodes these objects into runner API
+ representations.
+ """
+ def __init__(self, context, obj_type, proto_map=None):
+ self._pipeline_context = context
+ self._obj_type = obj_type
+ self._obj_to_id = {}
+ self._id_to_obj = {}
+ self._id_to_proto = proto_map if proto_map else {}
+ self._counter = 0
+
+ def _unique_ref(self):
+ self._counter += 1
+ return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
+
+ def populate_map(self, proto_map):
+ for id, proto in self._id_to_proto.items():
+ proto_map[id].CopyFrom(proto)
+
+ def get_id(self, obj):
+ if obj not in self._obj_to_id:
+ id = self._unique_ref()
+ self._id_to_obj[id] = obj
+ self._obj_to_id[obj] = id
+ self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
+ return self._obj_to_id[obj]
+
+ def get_by_id(self, id):
+ if id not in self._id_to_obj:
+ self._id_to_obj[id] = self._obj_type.from_runner_api(
+ self._id_to_proto[id], self._pipeline_context)
+ return self._id_to_obj[id]
+
+
+class PipelineContext(object):
+ """Used for accessing and constructing the referenced objects of a Pipeline.
+ """
+
+ _COMPONENT_TYPES = {
+ 'transforms': pipeline.AppliedPTransform,
+ 'pcollections': pvalue.PCollection,
+ 'coders': coders.Coder,
+ 'windowing_strategies': core.Windowing,
+ # TODO: environment
+ }
+
+ def __init__(self, context_proto=None):
+ for name, cls in self._COMPONENT_TYPES.items():
+ setattr(
+ self, name, _PipelineContextMap(
+ self, cls, getattr(context_proto, name, None)))
+
+ @staticmethod
+ def from_runner_api(proto):
+ return PipelineContext(proto)
+
+ def to_runner_api(self):
+ context_proto = beam_runner_api_pb2.Components()
+ for name in self._COMPONENT_TYPES:
+ getattr(self, name).populate_map(getattr(context_proto, name))
+ return context_proto
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/runners/pipeline_context_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py b/sdks/python/apache_beam/runners/pipeline_context_test.py
new file mode 100644
index 0000000..6091ed8
--- /dev/null
+++ b/sdks/python/apache_beam/runners/pipeline_context_test.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the windowing classes."""
+
+import unittest
+
+from apache_beam import coders
+from apache_beam.runners import pipeline_context
+
+
+class PipelineContextTest(unittest.TestCase):
+
+ def test_deduplication(self):
+ context = pipeline_context.PipelineContext()
+ bytes_coder_ref = context.coders.get_id(coders.BytesCoder())
+ bytes_coder_ref2 = context.coders.get_id(coders.BytesCoder())
+ self.assertEqual(bytes_coder_ref, bytes_coder_ref2)
+
+ def test_serialization(self):
+ context = pipeline_context.PipelineContext()
+ float_coder_ref = context.coders.get_id(coders.FloatCoder())
+ bytes_coder_ref = context.coders.get_id(coders.BytesCoder())
+ proto = context.to_runner_api()
+ context2 = pipeline_context.PipelineContext.from_runner_api(proto)
+ self.assertEqual(
+ coders.FloatCoder(),
+ context2.coders.get_by_id(float_coder_ref))
+ self.assertEqual(
+ coders.BytesCoder(),
+ context2.coders.get_by_id(bytes_coder_ref))
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 1fc63b2..3251671 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1235,6 +1235,7 @@ class Windowing(object):
trigger=self.triggerfn.to_runner_api(context),
accumulation_mode=self.accumulation_mode,
output_time=self.output_time_fn,
+ # TODO(robertwb): Support EMIT_IF_NONEMPTY
closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
allowed_lateness=0)
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index cc9e0f5..827aa33 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -25,6 +25,7 @@ import unittest
import yaml
import apache_beam as beam
+from apache_beam.runners import pipeline_context
from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms import trigger
from apache_beam.transforms.core import Windowing
@@ -392,22 +393,7 @@ class RunnerApiTest(unittest.TestCase):
AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
Repeatedly(AfterCount(100)),
trigger.OrFinally(AfterCount(3), AfterCount(10))):
- context = beam.pipeline.PipelineContext()
- self.assertEqual(
- trigger_fn,
- TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
-
- def test_windowing_strategy_encoding(self):
- for trigger_fn in (
- DefaultTrigger(),
- AfterAll(AfterCount(1), AfterCount(10)),
- AfterFirst(AfterCount(10), AfterCount(100)),
- AfterEach(AfterCount(100), AfterCount(1000)),
- AfterWatermark(early=AfterCount(1000)),
- AfterWatermark(early=AfterCount(1000), late=AfterCount(1)),
- Repeatedly(AfterCount(100)),
- trigger.OrFinally(AfterCount(3), AfterCount(10))):
- context = beam.pipeline.PipelineContext()
+ context = pipeline_context.PipelineContext()
self.assertEqual(
trigger_fn,
TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context))
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index c763a96..3878dff 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -73,6 +73,7 @@ class OutputTimeFn(object):
OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW
OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE
OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE
+ # TODO(robertwb): Add this to the runner API or remove it.
OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
@staticmethod
@@ -167,7 +168,6 @@ class WindowFn(object):
return pickler.loads(fn_parameter.value)
def to_runner_api_parameter(self, context):
- raise TypeError(self)
return (urns.PICKLED_WINDOW_FN,
wrappers_pb2.BytesValue(value=pickler.dumps(self)))
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index c79739a..99be02c 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -19,7 +19,7 @@
import unittest
-from apache_beam import pipeline
+from apache_beam.runners import pipeline_context
from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms import CombinePerKey
from apache_beam.transforms import combiners
@@ -238,7 +238,7 @@ class RunnerApiTest(unittest.TestCase):
FixedWindows(37),
SlidingWindows(2, 389),
Sessions(5077)):
- context = pipeline.PipelineContext()
+ context = pipeline_context.PipelineContext()
self.assertEqual(
window_fn,
WindowFn.from_runner_api(window_fn.to_runner_api(context), context))
@@ -251,7 +251,7 @@ class RunnerApiTest(unittest.TestCase):
Windowing(SlidingWindows(10, 15, 21), AfterCount(28),
output_time_fn=OutputTimeFn.OUTPUT_AT_LATEST,
accumulation_mode=AccumulationMode.DISCARDING)):
- context = pipeline.PipelineContext()
+ context = pipeline_context.PipelineContext()
self.assertEqual(
windowing,
Windowing.from_runner_api(windowing.to_runner_api(context), context))
http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 186c99c..936e2cb 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -21,4 +21,4 @@ FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
-PICKLED_CODER = "dataflow:coder:pickled_python:v0.1"
+PICKLED_CODER = "beam:coder:pickled_python:v0.1"