You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/31 17:53:39 UTC
[1/2] beam git commit: Add TestStream to Python SDK
Repository: beam
Updated Branches:
refs/heads/master 62473ae4b -> 023e6ab94
Add TestStream to Python SDK
The TestStream will be used for verifying streaming runner semantics.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/55db47d5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/55db47d5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/55db47d5
Branch: refs/heads/master
Commit: 55db47d50bb97b238926c2a1b0b80c36b5345d44
Parents: 62473ae
Author: Charles Chen <cc...@google.com>
Authored: Thu Mar 30 18:20:04 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 31 10:53:16 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/window.py | 5 +
.../apache_beam/transforms/window_test.py | 6 +
sdks/python/apache_beam/utils/test_stream.py | 163 +++++++++++++++++++
.../apache_beam/utils/test_stream_test.py | 82 ++++++++++
4 files changed, 256 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/55db47d5/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 3878dff..dcc58b7 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -246,6 +246,11 @@ class TimestampedValue(object):
self.value = value
self.timestamp = Timestamp.of(timestamp)
+ def __cmp__(self, other):
+ if type(self) is not type(other):
+ return cmp(type(self), type(other))
+ return cmp((self.value, self.timestamp), (other.value, other.timestamp))
+
class GlobalWindow(BoundedWindow):
"""The default window into which all data is placed (via GlobalWindows)."""
http://git-wip-us.apache.org/repos/asf/beam/blob/55db47d5/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 11c8a68..1ac95e4 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -62,6 +62,12 @@ reify_windows = core.ParDo(ReifyWindowsFn())
class WindowTest(unittest.TestCase):
+ def test_timestamped_value_cmp(self):
+ self.assertEqual(TimestampedValue('a', 2), TimestampedValue('a', 2))
+ self.assertEqual(TimestampedValue('a', 2), TimestampedValue('a', 2.0))
+ self.assertNotEqual(TimestampedValue('a', 2), TimestampedValue('a', 2.1))
+ self.assertNotEqual(TimestampedValue('a', 2), TimestampedValue('b', 2))
+
def test_global_window(self):
self.assertEqual(GlobalWindow(), GlobalWindow())
self.assertNotEqual(GlobalWindow(),
http://git-wip-us.apache.org/repos/asf/beam/blob/55db47d5/sdks/python/apache_beam/utils/test_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/test_stream.py b/sdks/python/apache_beam/utils/test_stream.py
new file mode 100644
index 0000000..7ae27b7
--- /dev/null
+++ b/sdks/python/apache_beam/utils/test_stream.py
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Provides TestStream for verifying streaming runner semantics."""
+
+from abc import ABCMeta
+from abc import abstractmethod
+
+from apache_beam import coders
+from apache_beam import pvalue
+from apache_beam.transforms import PTransform
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.windowed_value import WindowedValue
+
+
+class Event(object):
+ """Test stream event to be emitted during execution of a TestStream."""
+
+ __metaclass__ = ABCMeta
+
+ def __cmp__(self, other):
+ if type(self) is not type(other):
+ return cmp(type(self), type(other))
+ return self._typed_cmp(other)
+
+ @abstractmethod
+ def _typed_cmp(self, other):
+ raise NotImplementedError
+
+
+class ElementEvent(Event):
+ """Element-producing test stream event."""
+
+ def __init__(self, timestamped_values):
+ self.timestamped_values = timestamped_values
+
+ def _typed_cmp(self, other):
+ return cmp(self.timestamped_values, other.timestamped_values)
+
+
+class WatermarkEvent(Event):
+ """Watermark-advancing test stream event."""
+
+ def __init__(self, new_watermark):
+ self.new_watermark = timestamp.Timestamp.of(new_watermark)
+
+ def _typed_cmp(self, other):
+ return cmp(self.new_watermark, other.new_watermark)
+
+
+class ProcessingTimeEvent(Event):
+ """Processing time-advancing test stream event."""
+
+ def __init__(self, advance_by):
+ self.advance_by = timestamp.Duration.of(advance_by)
+
+ def _typed_cmp(self, other):
+ return cmp(self.advance_by, other.advance_by)
+
+
+class TestStream(PTransform):
+ """Test stream that generates events on an unbounded PCollection of elements.
+
+ Each event emits elements, advances the watermark or advances the processing
+ time. After all of the specified elements are emitted, ceases to produce
+ output.
+ """
+
+ def __init__(self, coder=coders.FastPrimitivesCoder):
+ assert coder is not None
+ self.coder = coder
+ self.current_watermark = timestamp.MIN_TIMESTAMP
+ self.events = []
+
+ def expand(self, pbegin):
+ assert isinstance(pbegin, pvalue.PBegin)
+ self.pipeline = pbegin.pipeline
+ return pvalue.PCollection(self.pipeline)
+
+ def _infer_output_coder(self, input_type=None, input_coder=None):
+ return self.coder
+
+ def _add(self, event):
+ if isinstance(event, ElementEvent):
+ for tv in event.timestamped_values:
+ assert tv.timestamp < timestamp.MAX_TIMESTAMP, (
+ 'Element timestamp must be before timestamp.MAX_TIMESTAMP.')
+ elif isinstance(event, WatermarkEvent):
+ assert event.new_watermark > self.current_watermark, (
+ 'Watermark must strictly-monotonically advance.')
+ self.current_watermark = event.new_watermark
+ elif isinstance(event, ProcessingTimeEvent):
+ assert event.advance_by > 0, (
+ 'Must advance processing time by positive amount.')
+ else:
+ raise ValueError('Unknown event: %s' % event)
+ self.events.append(event)
+
+ def add_elements(self, elements):
+ """Add elements to the TestStream.
+
+ Elements added to the TestStream will be produced during pipeline execution.
+ These elements can be TimestampedValue, WindowedValue or raw unwrapped
+ elements that are serializable using the TestStream's specified Coder. When
+ a TimestampedValue or a WindowedValue element is used, the timestamp of the
+ TimestampedValue or WindowedValue will be the timestamp of the produced
+ element; otherwise, the current watermark timestamp will be used for that
+ element. The windows of a given WindowedValue are ignored by the
+ TestStream.
+ """
+ timestamped_values = []
+ for element in elements:
+ if isinstance(element, TimestampedValue):
+ timestamped_values.append(element)
+ elif isinstance(element, WindowedValue):
+ # Drop windows for elements in test stream.
+ timestamped_values.append(
+ TimestampedValue(element.value, element.timestamp))
+ else:
+ # Add elements with timestamp equal to current watermark.
+ timestamped_values.append(
+ TimestampedValue(element, self.current_watermark))
+ self._add(ElementEvent(timestamped_values))
+ return self
+
+ def advance_watermark_to(self, new_watermark):
+ """Advance the watermark to a given Unix timestamp.
+
+ The Unix timestamp value used must be later than the previous watermark
+ value and should be given as an int, float or utils.timestamp.Timestamp
+ object.
+ """
+ self._add(WatermarkEvent(new_watermark))
+ return self
+
+ def advance_watermark_to_infinity(self):
+ """Advance the watermark to the end of time."""
+ self.advance_watermark_to(timestamp.MAX_TIMESTAMP)
+ return self
+
+ def advance_processing_time(self, advance_by):
+ """Advance the current processing time by a given duration in seconds.
+
+ The duration must be a positive second duration and should be given as an
+ int, float or utils.timestamp.Duration object.
+ """
+ self._add(ProcessingTimeEvent(advance_by))
+ return self
http://git-wip-us.apache.org/repos/asf/beam/blob/55db47d5/sdks/python/apache_beam/utils/test_stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/test_stream_test.py b/sdks/python/apache_beam/utils/test_stream_test.py
new file mode 100644
index 0000000..cc207ee
--- /dev/null
+++ b/sdks/python/apache_beam/utils/test_stream_test.py
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the test_stream module."""
+
+import unittest
+
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.test_stream import ElementEvent
+from apache_beam.utils.test_stream import ProcessingTimeEvent
+from apache_beam.utils.test_stream import TestStream
+from apache_beam.utils.test_stream import WatermarkEvent
+from apache_beam.utils.windowed_value import WindowedValue
+
+
+class TestStreamTest(unittest.TestCase):
+
+ def test_basic_test_stream(self):
+ test_stream = (TestStream()
+ .advance_watermark_to(0)
+ .add_elements([
+ 'a',
+ WindowedValue('b', 3, []),
+ TimestampedValue('c', 6)])
+ .advance_processing_time(10)
+ .advance_watermark_to(8)
+ .add_elements(['d'])
+ .advance_watermark_to_infinity())
+ self.assertEqual(
+ test_stream.events,
+ [
+ WatermarkEvent(0),
+ ElementEvent([
+ TimestampedValue('a', 0),
+ TimestampedValue('b', 3),
+ TimestampedValue('c', 6),
+ ]),
+ ProcessingTimeEvent(10),
+ WatermarkEvent(8),
+ ElementEvent([
+ TimestampedValue('d', 8),
+ ]),
+ WatermarkEvent(timestamp.MAX_TIMESTAMP),
+ ]
+ )
+
+ def test_test_stream_errors(self):
+ with self.assertRaises(AssertionError, msg=(
+ 'Watermark must strictly-monotonically advance.')):
+ _ = (TestStream()
+ .advance_watermark_to(5)
+ .advance_watermark_to(4))
+
+ with self.assertRaises(AssertionError, msg=(
+ 'Must advance processing time by positive amount.')):
+ _ = (TestStream()
+ .advance_processing_time(-1))
+
+ with self.assertRaises(AssertionError, msg=(
+ 'Element timestamp must be before timestamp.MAX_TIMESTAMP.')):
+ _ = (TestStream()
+ .add_elements([
+ TimestampedValue('a', timestamp.MAX_TIMESTAMP)
+ ]))
+
+if __name__ == '__main__':
+ unittest.main()
[2/2] beam git commit: This closes #2389
Posted by al...@apache.org.
This closes #2389
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/023e6ab9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/023e6ab9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/023e6ab9
Branch: refs/heads/master
Commit: 023e6ab94aad0990681eeaa7125d45260cbb1a4c
Parents: 62473ae 55db47d
Author: Ahmet Altay <al...@google.com>
Authored: Fri Mar 31 10:53:20 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Mar 31 10:53:20 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/window.py | 5 +
.../apache_beam/transforms/window_test.py | 6 +
sdks/python/apache_beam/utils/test_stream.py | 163 +++++++++++++++++++
.../apache_beam/utils/test_stream_test.py | 82 ++++++++++
4 files changed, 256 insertions(+)
----------------------------------------------------------------------