You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/05/09 16:37:40 UTC
[3/4] beam git commit: [BEAM-2184] Rename OutputTimeFn to
TimestampCombiner.
[BEAM-2184] Rename OutputTimeFn to TimestampCombiner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2aa70944
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2aa70944
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2aa70944
Branch: refs/heads/release-2.0.0
Commit: 2aa70944ed88616e02e46c35549b74a784dfa4cc
Parents: f3f47f6
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon May 8 15:54:23 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 9 09:37:08 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/core.py | 23 ++++++++-------
sdks/python/apache_beam/transforms/timeutil.py | 22 +++++++-------
sdks/python/apache_beam/transforms/trigger.py | 26 ++++++++---------
.../apache_beam/transforms/trigger_test.py | 9 +++---
.../transforms/trigger_transcripts.yaml | 30 ++++++++++----------
sdks/python/apache_beam/transforms/window.py | 22 +++++++-------
.../apache_beam/transforms/window_test.py | 4 +--
7 files changed, 69 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 9367e6f..7ca1632 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -33,7 +33,7 @@ from apache_beam.transforms.display import HasDisplayData, DisplayDataItem
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.ptransform import PTransformWithSideInputs
from apache_beam.transforms.window import MIN_TIMESTAMP
-from apache_beam.transforms.window import OutputTimeFn
+from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import WindowedValue
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import GlobalWindows
@@ -1172,7 +1172,7 @@ class Partition(PTransformWithSideInputs):
class Windowing(object):
def __init__(self, windowfn, triggerfn=None, accumulation_mode=None,
- output_time_fn=None):
+ timestamp_combiner=None):
global AccumulationMode, DefaultTrigger # pylint: disable=global-variable-not-assigned
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.transforms.trigger import AccumulationMode, DefaultTrigger
@@ -1192,17 +1192,18 @@ class Windowing(object):
self.windowfn = windowfn
self.triggerfn = triggerfn
self.accumulation_mode = accumulation_mode
- self.output_time_fn = output_time_fn or OutputTimeFn.OUTPUT_AT_EOW
+ self.timestamp_combiner = (
+ timestamp_combiner or TimestampCombiner.OUTPUT_AT_EOW)
self._is_default = (
self.windowfn == GlobalWindows() and
self.triggerfn == DefaultTrigger() and
self.accumulation_mode == AccumulationMode.DISCARDING and
- self.output_time_fn == OutputTimeFn.OUTPUT_AT_EOW)
+ self.timestamp_combiner == TimestampCombiner.OUTPUT_AT_EOW)
def __repr__(self):
return "Windowing(%s, %s, %s, %s)" % (self.windowfn, self.triggerfn,
self.accumulation_mode,
- self.output_time_fn)
+ self.timestamp_combiner)
def __eq__(self, other):
if type(self) == type(other):
@@ -1212,7 +1213,7 @@ class Windowing(object):
self.windowfn == other.windowfn
and self.triggerfn == other.triggerfn
and self.accumulation_mode == other.accumulation_mode
- and self.output_time_fn == other.output_time_fn)
+ and self.timestamp_combiner == other.timestamp_combiner)
return False
def is_default(self):
@@ -1229,7 +1230,7 @@ class Windowing(object):
self.windowfn.get_window_coder()),
trigger=self.triggerfn.to_runner_api(context),
accumulation_mode=self.accumulation_mode,
- output_time=self.output_time_fn,
+ output_time=self.timestamp_combiner,
# TODO(robertwb): Support EMIT_IF_NONEMPTY
closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
allowed_lateness=0)
@@ -1242,7 +1243,7 @@ class Windowing(object):
windowfn=WindowFn.from_runner_api(proto.window_fn, context),
triggerfn=TriggerFn.from_runner_api(proto.trigger, context),
accumulation_mode=proto.accumulation_mode,
- output_time_fn=proto.output_time)
+ timestamp_combiner=proto.output_time)
@typehints.with_input_types(T)
@@ -1275,9 +1276,9 @@ class WindowInto(ParDo):
"""
triggerfn = kwargs.pop('trigger', None)
accumulation_mode = kwargs.pop('accumulation_mode', None)
- output_time_fn = kwargs.pop('output_time_fn', None)
+ timestamp_combiner = kwargs.pop('timestamp_combiner', None)
self.windowing = Windowing(windowfn, triggerfn, accumulation_mode,
- output_time_fn)
+ timestamp_combiner)
super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing))
def get_windowing(self, unused_inputs):
@@ -1307,7 +1308,7 @@ class WindowInto(ParDo):
windowing.windowfn,
trigger=windowing.triggerfn,
accumulation_mode=windowing.accumulation_mode,
- output_time_fn=windowing.output_time_fn)
+ timestamp_combiner=windowing.timestamp_combiner)
PTransform.register_urn(
http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/timeutil.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index 5453b20..435d41b 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -49,8 +49,8 @@ class TimeDomain(object):
raise ValueError('Unknown time domain: %s' % domain)
-class OutputTimeFnImpl(object):
- """Implementation of OutputTimeFn."""
+class TimestampCombinerImpl(object):
+ """Implementation of TimestampCombiner."""
__metaclass__ = ABCMeta
@@ -78,8 +78,8 @@ class OutputTimeFnImpl(object):
return self.combine_all(merging_timestamps)
-class DependsOnlyOnWindow(OutputTimeFnImpl):
- """OutputTimeFnImpl that only depends on the window."""
+class DependsOnlyOnWindow(TimestampCombinerImpl):
+ """TimestampCombinerImpl that only depends on the window."""
__metaclass__ = ABCMeta
@@ -92,8 +92,8 @@ class DependsOnlyOnWindow(OutputTimeFnImpl):
return self.assign_output_time(result_window, None)
-class OutputAtEarliestInputTimestampImpl(OutputTimeFnImpl):
- """OutputTimeFnImpl outputting at earliest input timestamp."""
+class OutputAtEarliestInputTimestampImpl(TimestampCombinerImpl):
+ """TimestampCombinerImpl outputting at earliest input timestamp."""
def assign_output_time(self, window, input_timestamp):
return input_timestamp
@@ -103,8 +103,8 @@ class OutputAtEarliestInputTimestampImpl(OutputTimeFnImpl):
return min(output_timestamp, other_output_timestamp)
-class OutputAtEarliestTransformedInputTimestampImpl(OutputTimeFnImpl):
- """OutputTimeFnImpl outputting at earliest input timestamp."""
+class OutputAtEarliestTransformedInputTimestampImpl(TimestampCombinerImpl):
+ """TimestampCombinerImpl outputting at earliest input timestamp."""
def __init__(self, window_fn):
self.window_fn = window_fn
@@ -116,8 +116,8 @@ class OutputAtEarliestTransformedInputTimestampImpl(OutputTimeFnImpl):
return min(output_timestamp, other_output_timestamp)
-class OutputAtLatestInputTimestampImpl(OutputTimeFnImpl):
- """OutputTimeFnImpl outputting at latest input timestamp."""
+class OutputAtLatestInputTimestampImpl(TimestampCombinerImpl):
+ """TimestampCombinerImpl outputting at latest input timestamp."""
def assign_output_time(self, window, input_timestamp):
return input_timestamp
@@ -127,7 +127,7 @@ class OutputAtLatestInputTimestampImpl(OutputTimeFnImpl):
class OutputAtEndOfWindowImpl(DependsOnlyOnWindow):
- """OutputTimeFnImpl outputting at end of window."""
+ """TimestampCombinerImpl outputting at end of window."""
def assign_output_time(self, window, unused_input_timestamp):
return window.end
http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index b9786f4..bcb9dd3 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -32,7 +32,7 @@ from apache_beam.transforms.timeutil import MAX_TIMESTAMP
from apache_beam.transforms.timeutil import MIN_TIMESTAMP
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.window import GlobalWindow
-from apache_beam.transforms.window import OutputTimeFn
+from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import WindowedValue
from apache_beam.transforms.window import WindowFn
from apache_beam.runners.api import beam_runner_api_pb2
@@ -100,17 +100,17 @@ class ListStateTag(StateTag):
class WatermarkHoldStateTag(StateTag):
- def __init__(self, tag, output_time_fn_impl):
+ def __init__(self, tag, timestamp_combiner_impl):
super(WatermarkHoldStateTag, self).__init__(tag)
- self.output_time_fn_impl = output_time_fn_impl
+ self.timestamp_combiner_impl = timestamp_combiner_impl
def __repr__(self):
return 'WatermarkHoldStateTag(%s, %s)' % (self.tag,
- self.output_time_fn_impl)
+ self.timestamp_combiner_impl)
def with_prefix(self, prefix):
return WatermarkHoldStateTag(prefix + self.tag,
- self.output_time_fn_impl)
+ self.timestamp_combiner_impl)
# pylint: disable=unused-argument
@@ -750,7 +750,7 @@ class MergeableStateAdapter(SimpleState):
elif isinstance(tag, ListStateTag):
return [v for vs in values for v in vs]
elif isinstance(tag, WatermarkHoldStateTag):
- return tag.output_time_fn_impl.combine_all(values)
+ return tag.timestamp_combiner_impl.combine_all(values)
else:
raise ValueError('Invalid tag.', tag)
@@ -909,11 +909,11 @@ class GeneralTriggerDriver(TriggerDriver):
def __init__(self, windowing):
self.window_fn = windowing.windowfn
- self.output_time_fn_impl = OutputTimeFn.get_impl(windowing.output_time_fn,
- self.window_fn)
+ self.timestamp_combiner_impl = TimestampCombiner.get_impl(
+ windowing.timestamp_combiner, self.window_fn)
# pylint: disable=invalid-name
- self.WATERMARK_HOLD = WatermarkHoldStateTag('watermark',
- self.output_time_fn_impl)
+ self.WATERMARK_HOLD = WatermarkHoldStateTag(
+ 'watermark', self.timestamp_combiner_impl)
# pylint: enable=invalid-name
self.trigger_fn = windowing.triggerfn
self.accumulation_mode = windowing.accumulation_mode
@@ -965,10 +965,10 @@ class GeneralTriggerDriver(TriggerDriver):
continue
# Add watermark hold.
# TODO(ccy): Add late data and garbage-collection hold support.
- output_time = self.output_time_fn_impl.merge(
+ output_time = self.timestamp_combiner_impl.merge(
window,
(element_output_time for element_output_time in
- (self.output_time_fn_impl.assign_output_time(window, timestamp)
+ (self.timestamp_combiner_impl.assign_output_time(window, timestamp)
for unused_value, timestamp in elements)
if element_output_time >= output_watermark))
if output_time is not None:
@@ -1075,7 +1075,7 @@ class InMemoryUnmergedState(UnmergedState):
elif isinstance(tag, ListStateTag):
return values
elif isinstance(tag, WatermarkHoldStateTag):
- return tag.output_time_fn_impl.combine_all(values)
+ return tag.timestamp_combiner_impl.combine_all(values)
else:
raise ValueError('Invalid tag.', tag)
http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 914babb..38871fe 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -44,7 +44,7 @@ from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import MIN_TIMESTAMP
-from apache_beam.transforms.window import OutputTimeFn
+from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import Sessions
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import WindowedValue
@@ -522,11 +522,12 @@ class TranscriptTest(unittest.TestCase):
trigger_fn = parse_fn(spec.get('trigger_fn', 'Default'), trigger_names)
accumulation_mode = getattr(
AccumulationMode, spec.get('accumulation_mode', 'ACCUMULATING').upper())
- output_time_fn = getattr(
- OutputTimeFn, spec.get('output_time_fn', 'OUTPUT_AT_EOW').upper())
+ timestamp_combiner = getattr(
+ TimestampCombiner,
+ spec.get('timestamp_combiner', 'OUTPUT_AT_EOW').upper())
driver = GeneralTriggerDriver(
- Windowing(window_fn, trigger_fn, accumulation_mode, output_time_fn))
+ Windowing(window_fn, trigger_fn, accumulation_mode, timestamp_combiner))
state = InMemoryUnmergedState()
output = []
watermark = MIN_TIMESTAMP
http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/trigger_transcripts.yaml
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_transcripts.yaml b/sdks/python/apache_beam/transforms/trigger_transcripts.yaml
index f87cd1d..a736e94 100644
--- a/sdks/python/apache_beam/transforms/trigger_transcripts.yaml
+++ b/sdks/python/apache_beam/transforms/trigger_transcripts.yaml
@@ -29,7 +29,7 @@ transcript: # Ordered list of events.
name: fixed_default_late_data
window_fn: FixedWindows(10)
trigger_fn: Default
-output_time_fn: OUTPUT_AT_EOW
+timestamp_combiner: OUTPUT_AT_EOW
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
@@ -42,10 +42,10 @@ transcript:
- {window: [0, 9], values: [1, 2, 3, 7], timestamp: 10, late: true}
---
-name: output_time_fn_earliest
+name: timestamp_combiner_earliest
window_fn: FixedWindows(10)
trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST
+timestamp_combiner: OUTPUT_AT_EARLIEST
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
@@ -55,10 +55,10 @@ transcript:
- {window: [20, 29], values: [25], timestamp: 25, late: false}
---
-name: output_time_fn_latest
+name: timestamp_combiner_latest
window_fn: FixedWindows(10)
trigger_fn: Default
-output_time_fn: OUTPUT_AT_LATEST
+timestamp_combiner: OUTPUT_AT_LATEST
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
@@ -69,10 +69,10 @@ transcript:
---
# Test that custom timestamping is not invoked.
-name: output_time_fn_custom_timestamping_eow
+name: timestamp_combiner_custom_timestamping_eow
window_fn: CustomTimestampingFixedWindowsWindowFn(10)
trigger_fn: Default
-output_time_fn: OUTPUT_AT_EOW
+timestamp_combiner: OUTPUT_AT_EOW
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
@@ -83,10 +83,10 @@ transcript:
---
# Test that custom timestamping is not invoked.
-name: output_time_fn_custom_timestamping_earliest
+name: timestamp_combiner_custom_timestamping_earliest
window_fn: CustomTimestampingFixedWindowsWindowFn(10)
trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST
+timestamp_combiner: OUTPUT_AT_EARLIEST
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
@@ -97,10 +97,10 @@ transcript:
---
# Test that custom timestamping is in fact invoked.
-name: output_time_fn_custom_timestamping_earliest
+name: timestamp_combiner_custom_timestamping_earliest
window_fn: CustomTimestampingFixedWindowsWindowFn(10)
trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST_TRANSFORMED
+timestamp_combiner: OUTPUT_AT_EARLIEST_TRANSFORMED
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
@@ -113,7 +113,7 @@ transcript:
name: early_late_sessions
window_fn: Sessions(10)
trigger_fn: AfterWatermark(early=AfterCount(2), late=AfterCount(3))
-output_time_fn: OUTPUT_AT_EOW
+timestamp_combiner: OUTPUT_AT_EOW
transcript:
- input: [1, 2, 3]
- expect:
@@ -136,7 +136,7 @@ transcript:
name: garbage_collection
window_fn: FixedWindows(10)
trigger_fn: AfterCount(2)
-output_time_fn: OUTPUT_AT_EOW
+timestamp_combiner: OUTPUT_AT_EOW
allowed_lateness: 10
accumulation_mode: discarding
transcript:
@@ -153,7 +153,7 @@ transcript:
name: known_late_data_watermark
window_fn: FixedWindows(10)
trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST
+timestamp_combiner: OUTPUT_AT_EARLIEST
transcript:
- watermark: 5
- input: [2, 3, 7, 8]
@@ -165,7 +165,7 @@ transcript:
name: known_late_data_no_watermark_hold_possible
window_fn: FixedWindows(10)
trigger_fn: Default
-output_time_fn: OUTPUT_AT_EARLIEST
+timestamp_combiner: OUTPUT_AT_EARLIEST
transcript:
- watermark: 8
- input: [2, 3, 7]
http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 9c4b109..44a5a26 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -67,7 +67,7 @@ from apache_beam.utils import urns
# TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their
# behavior.
-class OutputTimeFn(object):
+class TimestampCombiner(object):
"""Determines how output timestamps of grouping operations are assigned."""
OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW
@@ -77,17 +77,17 @@ class OutputTimeFn(object):
OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
@staticmethod
- def get_impl(output_time_fn, window_fn):
- if output_time_fn == OutputTimeFn.OUTPUT_AT_EOW:
+ def get_impl(timestamp_combiner, window_fn):
+ if timestamp_combiner == TimestampCombiner.OUTPUT_AT_EOW:
return timeutil.OutputAtEndOfWindowImpl()
- elif output_time_fn == OutputTimeFn.OUTPUT_AT_EARLIEST:
+ elif timestamp_combiner == TimestampCombiner.OUTPUT_AT_EARLIEST:
return timeutil.OutputAtEarliestInputTimestampImpl()
- elif output_time_fn == OutputTimeFn.OUTPUT_AT_LATEST:
+ elif timestamp_combiner == TimestampCombiner.OUTPUT_AT_LATEST:
return timeutil.OutputAtLatestInputTimestampImpl()
- elif output_time_fn == OutputTimeFn.OUTPUT_AT_EARLIEST_TRANSFORMED:
+ elif timestamp_combiner == TimestampCombiner.OUTPUT_AT_EARLIEST_TRANSFORMED:
return timeutil.OutputAtEarliestTransformedInputTimestampImpl(window_fn)
else:
- raise ValueError('Invalid OutputTimeFn: %s.' % output_time_fn)
+ raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner)
class WindowFn(urns.RunnerApiFn):
@@ -132,10 +132,10 @@ class WindowFn(urns.RunnerApiFn):
def get_transformed_output_time(self, window, input_timestamp): # pylint: disable=unused-argument
"""Given input time and output window, returns output time for window.
- If OutputTimeFn.OUTPUT_AT_EARLIEST_TRANSFORMED is used in the Windowing,
- the output timestamp for the given window will be the earliest of the
- timestamps returned by get_transformed_output_time() for elements of the
- window.
+ If TimestampCombiner.OUTPUT_AT_EARLIEST_TRANSFORMED is used in the
+ Windowing, the output timestamp for the given window will be the earliest
+ of the timestamps returned by get_transformed_output_time() for elements
+ of the window.
Arguments:
window: Output window of element.
http://git-wip-us.apache.org/repos/asf/beam/blob/2aa70944/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 38a2df8..0f613d7 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -38,7 +38,7 @@ from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import IntervalWindow
-from apache_beam.transforms.window import OutputTimeFn
+from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import Sessions
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampedValue
@@ -271,7 +271,7 @@ class RunnerApiTest(unittest.TestCase):
Windowing(FixedWindows(1, 3), AfterCount(6),
accumulation_mode=AccumulationMode.ACCUMULATING),
Windowing(SlidingWindows(10, 15, 21), AfterCount(28),
- output_time_fn=OutputTimeFn.OUTPUT_AT_LATEST,
+ timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST,
accumulation_mode=AccumulationMode.DISCARDING)):
context = pipeline_context.PipelineContext()
self.assertEqual(