You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by cc...@apache.org on 2018/11/29 22:32:28 UTC

[beam] 01/01: Revert "Optimize several Python coder implementations."

This is an automated email from the ASF dual-hosted git repository.

ccy pushed a commit to branch revert-7130-fast-coders
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4a6527dac57070a29c300b263749b5bacccf3ea2
Author: Charles Chen <ch...@users.noreply.github.com>
AuthorDate: Thu Nov 29 14:32:17 2018 -0800

    Revert "Optimize several Python coder implementations."
---
 sdks/python/apache_beam/coders/coder_impl.pxd      | 60 +-------------
 sdks/python/apache_beam/coders/coder_impl.py       | 94 +++++++++-------------
 sdks/python/apache_beam/coders/coders.py           |  2 +-
 .../apache_beam/tools/coders_microbenchmark.py     | 50 ++----------
 sdks/python/apache_beam/transforms/window.py       | 26 ++++--
 sdks/python/apache_beam/utils/windowed_value.pxd   | 19 -----
 sdks/python/apache_beam/utils/windowed_value.py    | 61 +-------------
 sdks/python/scripts/generate_pydoc.sh              |  1 -
 8 files changed, 71 insertions(+), 242 deletions(-)

diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index db724e6..03d0a56 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -25,14 +25,11 @@ cimport libc.stdint
 cimport libc.stdlib
 cimport libc.string
 
-cdef extern from "math.h":
-  libc.stdint.int64_t abs "llabs"(libc.stdint.int64_t)
-
 from .stream cimport InputStream, OutputStream
 from apache_beam.utils cimport windowed_value
 
 
-cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size, past_unicode
+cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size
 # Temporarily untyped to allow monkeypatching on failed import.
 #cdef type WindowedValue
 
@@ -78,11 +75,8 @@ cdef unsigned char SET_TYPE
 
 cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
   cdef CoderImpl fallback_coder_impl
-  @cython.locals(dict_value=dict, int_value=libc.stdint.int64_t,
-                 unicode_value=unicode)
+  @cython.locals(dict_value=dict, int_value=libc.stdint.int64_t)
   cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
-  @cython.locals(t=int)
-  cpdef decode_from_stream(self, InputStream stream, bint nested)
 
 
 cdef class BytesCoderImpl(CoderImpl):
@@ -129,9 +123,6 @@ cdef class TupleCoderImpl(AbstractComponentCoderImpl):
 cdef class SequenceCoderImpl(StreamCoderImpl):
   cdef CoderImpl _elem_coder
   cpdef _construct_from_sequence(self, values)
-  @cython.locals(buffer=OutputStream, target_buffer_size=libc.stdint.int64_t,
-                 index=libc.stdint.int64_t)
-  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
 
 
 cdef class TupleSequenceCoderImpl(SequenceCoderImpl):
@@ -142,41 +133,8 @@ cdef class IterableCoderImpl(SequenceCoderImpl):
   pass
 
 
-cdef object IntervalWindow
-
-cdef class IntervalWindowCoderImpl(StreamCoderImpl):
-  cdef libc.stdint.uint64_t _to_normal_time(self, libc.stdint.int64_t value)
-  cdef libc.stdint.int64_t _from_normal_time(self, libc.stdint.uint64_t value)
-
-  @cython.locals(typed_value=windowed_value._IntervalWindowBase,
-                 span_millis=libc.stdint.int64_t)
-  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
-
-  @cython.locals(typed_value=windowed_value._IntervalWindowBase)
-  cpdef decode_from_stream(self, InputStream stream, bint nested)
-
-  @cython.locals(typed_value=windowed_value._IntervalWindowBase,
-                 span_millis=libc.stdint.int64_t)
-  cpdef estimate_size(self, value, bint nested=?)
-
-
-cdef int PaneInfoTiming_UNKNOWN
-cdef int PaneInfoEncoding_FIRST
-
-
 cdef class PaneInfoCoderImpl(StreamCoderImpl):
-  cdef int _choose_encoding(self, windowed_value.PaneInfo value)
-
-  @cython.locals(pane_info=windowed_value.PaneInfo, encoding_type=int)
-  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
-
-  @cython.locals(encoded_first_byte=int, encoding_type=int)
-  cpdef decode_from_stream(self, InputStream stream, bint nested)
-
-
-cdef libc.stdint.uint64_t _TIME_SHIFT
-cdef libc.stdint.int64_t MIN_TIMESTAMP_micros
-cdef libc.stdint.int64_t MAX_TIMESTAMP_micros
+  cdef int _choose_encoding(self, value)
 
 
 cdef class WindowedValueCoderImpl(StreamCoderImpl):
@@ -186,18 +144,8 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl):
   cdef CoderImpl _windows_coder
   cdef CoderImpl _pane_info_coder
 
-  cdef libc.stdint.uint64_t _to_normal_time(self, libc.stdint.int64_t value)
-  cdef libc.stdint.int64_t _from_normal_time(self, libc.stdint.uint64_t value)
-
   @cython.locals(c=CoderImpl)
   cpdef get_estimated_size_and_observables(self, value, bint nested=?)
 
-  @cython.locals(timestamp=libc.stdint.int64_t)
-  cpdef decode_from_stream(self, InputStream stream, bint nested)
-
-  @cython.locals(wv=windowed_value.WindowedValue, restore_sign=int)
+  @cython.locals(wv=windowed_value.WindowedValue)
   cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
-
-
-cdef class LengthPrefixCoderImpl(StreamCoderImpl):
-  cdef CoderImpl _value_coder
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index c3768dd..070e07d 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -37,8 +37,8 @@ from __future__ import division
 from builtins import chr
 from builtins import object
 
-from past.builtins import unicode as past_unicode
 from past.builtins import long
+from past.builtins import unicode
 
 from apache_beam.coders import observable
 from apache_beam.utils import windowed_value
@@ -71,11 +71,6 @@ except ImportError:
 # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
 
 
-_TIME_SHIFT = 1 << 63
-MIN_TIMESTAMP_micros = MIN_TIMESTAMP.micros
-MAX_TIMESTAMP_micros = MAX_TIMESTAMP.micros
-
-
 class CoderImpl(object):
   """For internal use only; no backwards-compatibility guarantees."""
 
@@ -221,7 +216,7 @@ class DeterministicFastPrimitivesCoderImpl(CoderImpl):
     self._step_label = step_label
 
   def _check_safe(self, value):
-    if isinstance(value, (bytes, past_unicode, long, int, float)):
+    if isinstance(value, (bytes, unicode, long, int, float)):
       pass
     elif value is None:
       pass
@@ -326,10 +321,10 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
     elif t is bytes:
       stream.write_byte(BYTES_TYPE)
       stream.write(value, nested)
-    elif t is past_unicode:
-      unicode_value = value  # for typing
+    elif t is unicode:
+      text_value = value  # for typing
       stream.write_byte(UNICODE_TYPE)
-      stream.write(unicode_value.encode('utf-8'), nested)
+      stream.write(text_value.encode('utf-8'), nested)
     elif t is list or t is tuple or t is set:
       stream.write_byte(
           LIST_TYPE if t is list else TUPLE_TYPE if t is tuple else SET_TYPE)
@@ -418,47 +413,37 @@ class FloatCoderImpl(StreamCoderImpl):
     return 8
 
 
-IntervalWindow = None
-
-
 class IntervalWindowCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
 
   # TODO: Fn Harness only supports millis. Is this important enough to fix?
   def _to_normal_time(self, value):
     """Convert "lexicographically ordered unsigned" to signed."""
-    return value - _TIME_SHIFT
+    return value - (1 << 63)
 
   def _from_normal_time(self, value):
     """Convert signed to "lexicographically ordered unsigned"."""
-    return value + _TIME_SHIFT
+    return value + (1 << 63)
 
   def encode_to_stream(self, value, out, nested):
-    typed_value = value
-    span_millis = (typed_value._end_micros // 1000
-                   - typed_value._start_micros // 1000)
+    span_micros = value.end.micros - value.start.micros
     out.write_bigendian_uint64(
-        self._from_normal_time(typed_value._end_micros // 1000))
-    out.write_var_int64(span_millis)
+        self._from_normal_time(value.end.micros // 1000))
+    out.write_var_int64(span_micros // 1000)
 
   def decode_from_stream(self, in_, nested):
-    global IntervalWindow
-    if IntervalWindow is None:
-      from apache_beam.transforms.window import IntervalWindow
-    typed_value = IntervalWindow(None, None)
-    typed_value._end_micros = (
-        1000 * self._to_normal_time(in_.read_bigendian_uint64()))
-    typed_value._start_micros = (
-        typed_value._end_micros - 1000 * in_.read_var_int64())
-    return typed_value
+    end_millis = self._to_normal_time(in_.read_bigendian_uint64())
+    start_millis = end_millis - in_.read_var_int64()
+    from apache_beam.transforms.window import IntervalWindow
+    ret = IntervalWindow(start=Timestamp(micros=start_millis * 1000),
+                         end=Timestamp(micros=end_millis * 1000))
+    return ret
 
   def estimate_size(self, value, nested=False):
     # An IntervalWindow is context-insensitive, with a timestamp (8 bytes)
     # and a varint timespam.
-    typed_value = value
-    span_millis = (typed_value._end_micros // 1000
-                   - typed_value._start_micros // 1000)
-    return 8 + get_varint_size(span_millis)
+    span = value.end.micros - value.start.micros
+    return 8 + get_varint_size(span // 1000)
 
 
 class TimestampCoderImpl(StreamCoderImpl):
@@ -662,11 +647,10 @@ class SequenceCoderImpl(StreamCoderImpl):
       # -1 to indicate that the length is not known.
       out.write_bigendian_int32(-1)
       buffer = create_OutputStream()
-      target_buffer_size = self._DEFAULT_BUFFER_SIZE
       prev_index = index = -1
       for index, elem in enumerate(value):
         self._elem_coder.encode_to_stream(elem, buffer, True)
-        if buffer.size() > target_buffer_size:
+        if out.size() > self._DEFAULT_BUFFER_SIZE:
           out.write_var_int64(index - prev_index)
           out.write(buffer.get())
           prev_index = index
@@ -755,31 +739,25 @@ class PaneInfoEncoding(object):
   TWO_INDICES = 2
 
 
-# These are cdef'd to ints to optimized the common case.
-PaneInfoTiming_UNKNOWN = windowed_value.PaneInfoTiming.UNKNOWN
-PaneInfoEncoding_FIRST = PaneInfoEncoding.FIRST
-
-
 class PaneInfoCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees.
 
   Coder for a PaneInfo descriptor."""
 
   def _choose_encoding(self, value):
-    if ((value._index == 0 and value._nonspeculative_index == 0) or
-        value._timing == PaneInfoTiming_UNKNOWN):
-      return PaneInfoEncoding_FIRST
-    elif (value._index == value._nonspeculative_index or
-          value._timing == windowed_value.PaneInfoTiming.EARLY):
+    if ((value.index == 0 and value.nonspeculative_index == 0) or
+        value.timing == windowed_value.PaneInfoTiming.UNKNOWN):
+      return PaneInfoEncoding.FIRST
+    elif (value.index == value.nonspeculative_index or
+          value.timing == windowed_value.PaneInfoTiming.EARLY):
       return PaneInfoEncoding.ONE_INDEX
     else:
       return PaneInfoEncoding.TWO_INDICES
 
   def encode_to_stream(self, value, out, nested):
-    pane_info = value  # cast
-    encoding_type = self._choose_encoding(pane_info)
-    out.write_byte(pane_info._encoded_byte | (encoding_type << 4))
-    if encoding_type == PaneInfoEncoding_FIRST:
+    encoding_type = self._choose_encoding(value)
+    out.write_byte(value.encoded_byte | (encoding_type << 4))
+    if encoding_type == PaneInfoEncoding.FIRST:
       return
     elif encoding_type == PaneInfoEncoding.ONE_INDEX:
       out.write_var_int64(value.index)
@@ -794,7 +772,7 @@ class PaneInfoCoderImpl(StreamCoderImpl):
     base = windowed_value._BYTE_TO_PANE_INFO[encoded_first_byte & 0xF]
     assert base is not None
     encoding_type = encoded_first_byte >> 4
-    if encoding_type == PaneInfoEncoding_FIRST:
+    if encoding_type == PaneInfoEncoding.FIRST:
       return base
     elif encoding_type == PaneInfoEncoding.ONE_INDEX:
       index = in_stream.read_var_int64()
@@ -833,11 +811,11 @@ class WindowedValueCoderImpl(StreamCoderImpl):
   # byte representation of timestamps.
   def _to_normal_time(self, value):
     """Convert "lexicographically ordered unsigned" to signed."""
-    return value - _TIME_SHIFT
+    return value - (1 << 63)
 
   def _from_normal_time(self, value):
     """Convert signed to "lexicographically ordered unsigned"."""
-    return value + _TIME_SHIFT
+    return value + (1 << 63)
 
   def __init__(self, value_coder, timestamp_coder, window_coder):
     # TODO(lcwik): Remove the timestamp coder field
@@ -871,12 +849,16 @@ class WindowedValueCoderImpl(StreamCoderImpl):
     # were indeed MIN/MAX timestamps.
     # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
     # precision of timestamps.
-    if timestamp <= -(abs(MIN_TIMESTAMP_micros) // 1000):
-      timestamp = MIN_TIMESTAMP_micros
-    elif timestamp >= MAX_TIMESTAMP_micros // 1000:
-      timestamp = MAX_TIMESTAMP_micros
+    if timestamp == -(abs(MIN_TIMESTAMP.micros) // 1000):
+      timestamp = MIN_TIMESTAMP.micros
+    elif timestamp == (MAX_TIMESTAMP.micros // 1000):
+      timestamp = MAX_TIMESTAMP.micros
     else:
       timestamp *= 1000
+      if timestamp > MAX_TIMESTAMP.micros:
+        timestamp = MAX_TIMESTAMP.micros
+      if timestamp < MIN_TIMESTAMP.micros:
+        timestamp = MIN_TIMESTAMP.micros
 
     windows = self._windows_coder.decode_from_stream(in_stream, True)
     # Read PaneInfo encoded byte.
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index f5c90a8..a17bb08 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -995,7 +995,7 @@ class LengthPrefixCoder(FastCoder):
     self._value_coder = value_coder
 
   def _create_impl(self):
-    return coder_impl.LengthPrefixCoderImpl(self._value_coder.get_impl())
+    return coder_impl.LengthPrefixCoderImpl(self._value_coder)
 
   def is_deterministic(self):
     return self._value_coder.is_deterministic()
diff --git a/sdks/python/apache_beam/tools/coders_microbenchmark.py b/sdks/python/apache_beam/tools/coders_microbenchmark.py
index 695bdd1..9453d61 100644
--- a/sdks/python/apache_beam/tools/coders_microbenchmark.py
+++ b/sdks/python/apache_beam/tools/coders_microbenchmark.py
@@ -31,9 +31,7 @@ Run as:
 from __future__ import absolute_import
 from __future__ import print_function
 
-import argparse
 import random
-import re
 import string
 import sys
 
@@ -104,8 +102,7 @@ def small_list():
 
 
 def large_list():
-  # Bool is the last item in FastPrimitiveCoders before pickle.
-  return [bool(k) for k in list_int(1000)]
+  return list_int(1000)
 
 
 def small_tuple():
@@ -125,19 +122,6 @@ def large_dict():
   return {i: i for i in large_list()}
 
 
-def large_iterable():
-  yield 'a' * coders.coder_impl.SequenceCoderImpl._DEFAULT_BUFFER_SIZE
-  for k in range(1000):
-    yield k
-
-
-def globally_windowed_value():
-  return windowed_value.WindowedValue(
-      value=small_int(),
-      timestamp=12345678,
-      windows=(window.GlobalWindow(),))
-
-
 def random_windowed_value(num_windows):
   return windowed_value.WindowedValue(
       value=small_int(),
@@ -156,8 +140,7 @@ def wv_with_multiple_windows():
   return random_windowed_value(num_windows=32)
 
 
-def run_coder_benchmarks(
-    num_runs, input_size, seed, verbose, filter_regex='.*'):
+def run_coder_benchmarks(num_runs, input_size, seed, verbose):
   random.seed(seed)
 
   # TODO(BEAM-4441): Pick coders using type hints, for example:
@@ -184,9 +167,6 @@ def run_coder_benchmarks(
           coders.IterableCoder(coders.FastPrimitivesCoder()),
           large_list),
       coder_benchmark_factory(
-          coders.IterableCoder(coders.FastPrimitivesCoder()),
-          large_iterable),
-      coder_benchmark_factory(
           coders.FastPrimitivesCoder(),
           small_tuple),
       coder_benchmark_factory(
@@ -202,38 +182,20 @@ def run_coder_benchmarks(
           coders.WindowedValueCoder(coders.FastPrimitivesCoder()),
           wv_with_one_window),
       coder_benchmark_factory(
-          coders.WindowedValueCoder(coders.FastPrimitivesCoder(),
-                                    coders.IntervalWindowCoder()),
+          coders.WindowedValueCoder(coders.FastPrimitivesCoder()),
           wv_with_multiple_windows),
-      coder_benchmark_factory(
-          coders.WindowedValueCoder(coders.FastPrimitivesCoder(),
-                                    coders.GlobalWindowCoder()),
-          globally_windowed_value),
-      coder_benchmark_factory(
-          coders.LengthPrefixCoder(coders.FastPrimitivesCoder()),
-          small_int)
   ]
 
-  suite = [utils.BenchmarkConfig(b, input_size, num_runs) for b in benchmarks
-           if re.search(filter_regex, b.__name__, flags=re.I)]
+  suite = [utils.BenchmarkConfig(b, input_size, num_runs) for b in benchmarks]
   utils.run_benchmarks(suite, verbose=verbose)
 
 
 if __name__ == "__main__":
-
-  parser = argparse.ArgumentParser()
-  parser.add_argument('--filter', default='.*')
-  parser.add_argument('--num_runs', default=20, type=int)
-  parser.add_argument('--num_elements_per_benchmark', default=1000, type=int)
-  parser.add_argument('--seed', default=42, type=int)
-  options = parser.parse_args()
-
   utils.check_compiled("apache_beam.coders.coder_impl")
 
   num_runs = 20
   num_elements_per_benchmark = 1000
   seed = 42  # Fix the seed for better consistency
 
-  run_coder_benchmarks(
-      options.num_runs, options.num_elements_per_benchmark, options.seed,
-      verbose=True, filter_regex=options.filter)
+  run_coder_benchmarks(num_runs, num_elements_per_benchmark, seed,
+                       verbose=True)
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 1990532..0970a28 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -66,7 +66,6 @@ from apache_beam.portability.api import standard_window_fns_pb2
 from apache_beam.transforms import timeutil
 from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
-from apache_beam.utils import windowed_value
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.utils.timestamp import Duration
 from apache_beam.utils.timestamp import Timestamp
@@ -226,18 +225,31 @@ class BoundedWindow(object):
     return '[?, %s)' % float(self.end)
 
 
-@total_ordering
-class IntervalWindow(windowed_value._IntervalWindowBase, BoundedWindow):
+class IntervalWindow(BoundedWindow):
   """A window for timestamps in range [start, end).
 
   Attributes:
     start: Start of window as seconds since Unix epoch.
     end: End of window as seconds since Unix epoch.
   """
-  def __lt__(self, other):
-    if self.end != other.end:
-      return self.end < other.end
-    return hash(self) < hash(other)
+
+  def __init__(self, start, end):
+    super(IntervalWindow, self).__init__(end)
+    self.start = Timestamp.of(start)
+
+  def __hash__(self):
+    return hash((self.start, self.end))
+
+  def __eq__(self, other):
+    return (self.start == other.start
+            and self.end == other.end
+            and type(self) == type(other))
+
+  def __ne__(self, other):
+    return not self == other
+
+  def __repr__(self):
+    return '[%s, %s)' % (float(self.start), float(self.end))
 
   def intersects(self, other):
     return other.start < self.end or self.start < other.end
diff --git a/sdks/python/apache_beam/utils/windowed_value.pxd b/sdks/python/apache_beam/utils/windowed_value.pxd
index 9c042e7..8755c93 100644
--- a/sdks/python/apache_beam/utils/windowed_value.pxd
+++ b/sdks/python/apache_beam/utils/windowed_value.pxd
@@ -21,18 +21,6 @@ from libc.stdint cimport int64_t
 
 cdef type Timestamp
 
-
-cdef list _BYTE_TO_PANE_INFO
-
-@cython.final
-cdef class PaneInfo(object):
-  cdef readonly bint _is_first
-  cdef readonly bint _is_last
-  cdef readonly int _timing
-  cdef readonly int _index
-  cdef readonly int _nonspeculative_index
-  cdef readonly unsigned char _encoded_byte
-
 @cython.final
 cdef class WindowedValue(object):
   cdef public object value
@@ -46,10 +34,3 @@ cdef class WindowedValue(object):
 @cython.locals(wv=WindowedValue)
 cpdef WindowedValue create(
   object value, int64_t timestamp_micros, object windows, object pane_info=*)
-
-
-cdef class _IntervalWindowBase(object):
-  cdef object _start_object
-  cdef int64_t _start_micros
-  cdef object _end_object
-  cdef int64_t _end_micros
diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py
index 17568a3..6498497 100644
--- a/sdks/python/apache_beam/utils/windowed_value.py
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -58,11 +58,11 @@ class PaneInfo(object):
 
   def _get_encoded_byte(self):
     byte = 0
-    if self._is_first:
+    if self.is_first:
       byte |= 1
-    if self._is_last:
+    if self.is_last:
       byte |= 2
-    byte |= self._timing << 2
+    byte |= self.timing << 2
     return byte
 
   @staticmethod
@@ -121,10 +121,6 @@ class PaneInfo(object):
     return hash((self.is_first, self.is_last, self.timing, self.index,
                  self.nonspeculative_index))
 
-  def __reduce__(self):
-    return PaneInfo, (self._is_first, self._is_last, self._timing, self._index,
-                      self._nonspeculative_index)
-
 
 def _construct_well_known_pane_infos():
   pane_infos = []
@@ -237,54 +233,3 @@ except TypeError:
   # the cdef class, but in this case it's OK as it's already present
   # on each instance.
   pass
-
-
-class _IntervalWindowBase(object):
-  """Optimized form of IntervalWindow storing only microseconds for endpoints.
-  """
-
-  def __init__(self, start, end):
-    if start is not None or end is not None:
-      self._start_object = Timestamp.of(start)
-      self._end_object = Timestamp.of(end)
-      try:
-        self._start_micros = self._start_object.micros
-      except OverflowError:
-        self._start_micros = (
-            MIN_TIMESTAMP.micros if self._start_object.micros < 0
-            else MAX_TIMESTAMP.micros)
-      try:
-        self._end_micros = self._end_object.micros
-      except OverflowError:
-        self._end_micros = (
-            MIN_TIMESTAMP.micros if self._end_object.micros < 0
-            else MAX_TIMESTAMP.micros)
-    else:
-      # Micros must be populated elsewhere.
-      self._start_object = self._end_object = None
-
-  @property
-  def start(self):
-    if self._start_object is None:
-      self._start_object = Timestamp(0, self._start_micros)
-    return self._start_object
-
-  @property
-  def end(self):
-    if self._end_object is None:
-      self._end_object = Timestamp(0, self._end_micros)
-    return self._end_object
-
-  def __hash__(self):
-    return hash((self._start_micros, self._end_micros))
-
-  def __eq__(self, other):
-    return (type(self) == type(other)
-            and self._start_micros == other.start.micros
-            and self._end_micros == other.end.micros)
-
-  def __ne__(self, other):
-    return not self == other
-
-  def __repr__(self):
-    return '[%s, %s)' % (float(self.start), float(self.end))
diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh
index 4929c5e..e447d5e 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -160,7 +160,6 @@ ignore_identifiers = [
   'apache_beam.typehints.typehints.CompositeTypeHint',
   'apache_beam.typehints.typehints.TypeConstraint',
   'apache_beam.typehints.typehints.validate_composite_type_param()',
-  'apache_beam.utils.windowed_value._IntervalWindowBase',
 
   # Private classes which are used within the same module
   'WindowedTypeConstraint',  # apache_beam.typehints.typehints