You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by cc...@apache.org on 2018/11/29 22:32:27 UTC

[beam] branch revert-7130-fast-coders created (now 4a6527d)

This is an automated email from the ASF dual-hosted git repository.

ccy pushed a change to branch revert-7130-fast-coders
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 4a6527d  Revert "Optimize several Python coder implementations."

This branch includes the following new commits:

     new 4a6527d  Revert "Optimize several Python coder implementations."

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "Optimize several Python coder implementations."

Posted by cc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ccy pushed a commit to branch revert-7130-fast-coders
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4a6527dac57070a29c300b263749b5bacccf3ea2
Author: Charles Chen <ch...@users.noreply.github.com>
AuthorDate: Thu Nov 29 14:32:17 2018 -0800

    Revert "Optimize several Python coder implementations."
---
 sdks/python/apache_beam/coders/coder_impl.pxd      | 60 +-------------
 sdks/python/apache_beam/coders/coder_impl.py       | 94 +++++++++-------------
 sdks/python/apache_beam/coders/coders.py           |  2 +-
 .../apache_beam/tools/coders_microbenchmark.py     | 50 ++----------
 sdks/python/apache_beam/transforms/window.py       | 26 ++++--
 sdks/python/apache_beam/utils/windowed_value.pxd   | 19 -----
 sdks/python/apache_beam/utils/windowed_value.py    | 61 +-------------
 sdks/python/scripts/generate_pydoc.sh              |  1 -
 8 files changed, 71 insertions(+), 242 deletions(-)

diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index db724e6..03d0a56 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -25,14 +25,11 @@ cimport libc.stdint
 cimport libc.stdlib
 cimport libc.string
 
-cdef extern from "math.h":
-  libc.stdint.int64_t abs "llabs"(libc.stdint.int64_t)
-
 from .stream cimport InputStream, OutputStream
 from apache_beam.utils cimport windowed_value
 
 
-cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size, past_unicode
+cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size
 # Temporarily untyped to allow monkeypatching on failed import.
 #cdef type WindowedValue
 
@@ -78,11 +75,8 @@ cdef unsigned char SET_TYPE
 
 cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
   cdef CoderImpl fallback_coder_impl
-  @cython.locals(dict_value=dict, int_value=libc.stdint.int64_t,
-                 unicode_value=unicode)
+  @cython.locals(dict_value=dict, int_value=libc.stdint.int64_t)
   cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
-  @cython.locals(t=int)
-  cpdef decode_from_stream(self, InputStream stream, bint nested)
 
 
 cdef class BytesCoderImpl(CoderImpl):
@@ -129,9 +123,6 @@ cdef class TupleCoderImpl(AbstractComponentCoderImpl):
 cdef class SequenceCoderImpl(StreamCoderImpl):
   cdef CoderImpl _elem_coder
   cpdef _construct_from_sequence(self, values)
-  @cython.locals(buffer=OutputStream, target_buffer_size=libc.stdint.int64_t,
-                 index=libc.stdint.int64_t)
-  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
 
 
 cdef class TupleSequenceCoderImpl(SequenceCoderImpl):
@@ -142,41 +133,8 @@ cdef class IterableCoderImpl(SequenceCoderImpl):
   pass
 
 
-cdef object IntervalWindow
-
-cdef class IntervalWindowCoderImpl(StreamCoderImpl):
-  cdef libc.stdint.uint64_t _to_normal_time(self, libc.stdint.int64_t value)
-  cdef libc.stdint.int64_t _from_normal_time(self, libc.stdint.uint64_t value)
-
-  @cython.locals(typed_value=windowed_value._IntervalWindowBase,
-                 span_millis=libc.stdint.int64_t)
-  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
-
-  @cython.locals(typed_value=windowed_value._IntervalWindowBase)
-  cpdef decode_from_stream(self, InputStream stream, bint nested)
-
-  @cython.locals(typed_value=windowed_value._IntervalWindowBase,
-                 span_millis=libc.stdint.int64_t)
-  cpdef estimate_size(self, value, bint nested=?)
-
-
-cdef int PaneInfoTiming_UNKNOWN
-cdef int PaneInfoEncoding_FIRST
-
-
 cdef class PaneInfoCoderImpl(StreamCoderImpl):
-  cdef int _choose_encoding(self, windowed_value.PaneInfo value)
-
-  @cython.locals(pane_info=windowed_value.PaneInfo, encoding_type=int)
-  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
-
-  @cython.locals(encoded_first_byte=int, encoding_type=int)
-  cpdef decode_from_stream(self, InputStream stream, bint nested)
-
-
-cdef libc.stdint.uint64_t _TIME_SHIFT
-cdef libc.stdint.int64_t MIN_TIMESTAMP_micros
-cdef libc.stdint.int64_t MAX_TIMESTAMP_micros
+  cdef int _choose_encoding(self, value)
 
 
 cdef class WindowedValueCoderImpl(StreamCoderImpl):
@@ -186,18 +144,8 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl):
   cdef CoderImpl _windows_coder
   cdef CoderImpl _pane_info_coder
 
-  cdef libc.stdint.uint64_t _to_normal_time(self, libc.stdint.int64_t value)
-  cdef libc.stdint.int64_t _from_normal_time(self, libc.stdint.uint64_t value)
-
   @cython.locals(c=CoderImpl)
   cpdef get_estimated_size_and_observables(self, value, bint nested=?)
 
-  @cython.locals(timestamp=libc.stdint.int64_t)
-  cpdef decode_from_stream(self, InputStream stream, bint nested)
-
-  @cython.locals(wv=windowed_value.WindowedValue, restore_sign=int)
+  @cython.locals(wv=windowed_value.WindowedValue)
   cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
-
-
-cdef class LengthPrefixCoderImpl(StreamCoderImpl):
-  cdef CoderImpl _value_coder
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index c3768dd..070e07d 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -37,8 +37,8 @@ from __future__ import division
 from builtins import chr
 from builtins import object
 
-from past.builtins import unicode as past_unicode
 from past.builtins import long
+from past.builtins import unicode
 
 from apache_beam.coders import observable
 from apache_beam.utils import windowed_value
@@ -71,11 +71,6 @@ except ImportError:
 # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
 
 
-_TIME_SHIFT = 1 << 63
-MIN_TIMESTAMP_micros = MIN_TIMESTAMP.micros
-MAX_TIMESTAMP_micros = MAX_TIMESTAMP.micros
-
-
 class CoderImpl(object):
   """For internal use only; no backwards-compatibility guarantees."""
 
@@ -221,7 +216,7 @@ class DeterministicFastPrimitivesCoderImpl(CoderImpl):
     self._step_label = step_label
 
   def _check_safe(self, value):
-    if isinstance(value, (bytes, past_unicode, long, int, float)):
+    if isinstance(value, (bytes, unicode, long, int, float)):
       pass
     elif value is None:
       pass
@@ -326,10 +321,10 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
     elif t is bytes:
       stream.write_byte(BYTES_TYPE)
       stream.write(value, nested)
-    elif t is past_unicode:
-      unicode_value = value  # for typing
+    elif t is unicode:
+      text_value = value  # for typing
       stream.write_byte(UNICODE_TYPE)
-      stream.write(unicode_value.encode('utf-8'), nested)
+      stream.write(text_value.encode('utf-8'), nested)
     elif t is list or t is tuple or t is set:
       stream.write_byte(
           LIST_TYPE if t is list else TUPLE_TYPE if t is tuple else SET_TYPE)
@@ -418,47 +413,37 @@ class FloatCoderImpl(StreamCoderImpl):
     return 8
 
 
-IntervalWindow = None
-
-
 class IntervalWindowCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
 
   # TODO: Fn Harness only supports millis. Is this important enough to fix?
   def _to_normal_time(self, value):
     """Convert "lexicographically ordered unsigned" to signed."""
-    return value - _TIME_SHIFT
+    return value - (1 << 63)
 
   def _from_normal_time(self, value):
     """Convert signed to "lexicographically ordered unsigned"."""
-    return value + _TIME_SHIFT
+    return value + (1 << 63)
 
   def encode_to_stream(self, value, out, nested):
-    typed_value = value
-    span_millis = (typed_value._end_micros // 1000
-                   - typed_value._start_micros // 1000)
+    span_micros = value.end.micros - value.start.micros
     out.write_bigendian_uint64(
-        self._from_normal_time(typed_value._end_micros // 1000))
-    out.write_var_int64(span_millis)
+        self._from_normal_time(value.end.micros // 1000))
+    out.write_var_int64(span_micros // 1000)
 
   def decode_from_stream(self, in_, nested):
-    global IntervalWindow
-    if IntervalWindow is None:
-      from apache_beam.transforms.window import IntervalWindow
-    typed_value = IntervalWindow(None, None)
-    typed_value._end_micros = (
-        1000 * self._to_normal_time(in_.read_bigendian_uint64()))
-    typed_value._start_micros = (
-        typed_value._end_micros - 1000 * in_.read_var_int64())
-    return typed_value
+    end_millis = self._to_normal_time(in_.read_bigendian_uint64())
+    start_millis = end_millis - in_.read_var_int64()
+    from apache_beam.transforms.window import IntervalWindow
+    ret = IntervalWindow(start=Timestamp(micros=start_millis * 1000),
+                         end=Timestamp(micros=end_millis * 1000))
+    return ret
 
   def estimate_size(self, value, nested=False):
     # An IntervalWindow is context-insensitive, with a timestamp (8 bytes)
     # and a varint timespam.
-    typed_value = value
-    span_millis = (typed_value._end_micros // 1000
-                   - typed_value._start_micros // 1000)
-    return 8 + get_varint_size(span_millis)
+    span = value.end.micros - value.start.micros
+    return 8 + get_varint_size(span // 1000)
 
 
 class TimestampCoderImpl(StreamCoderImpl):
@@ -662,11 +647,10 @@ class SequenceCoderImpl(StreamCoderImpl):
       # -1 to indicate that the length is not known.
       out.write_bigendian_int32(-1)
       buffer = create_OutputStream()
-      target_buffer_size = self._DEFAULT_BUFFER_SIZE
       prev_index = index = -1
       for index, elem in enumerate(value):
         self._elem_coder.encode_to_stream(elem, buffer, True)
-        if buffer.size() > target_buffer_size:
+        if out.size() > self._DEFAULT_BUFFER_SIZE:
           out.write_var_int64(index - prev_index)
           out.write(buffer.get())
           prev_index = index
@@ -755,31 +739,25 @@ class PaneInfoEncoding(object):
   TWO_INDICES = 2
 
 
-# These are cdef'd to ints to optimized the common case.
-PaneInfoTiming_UNKNOWN = windowed_value.PaneInfoTiming.UNKNOWN
-PaneInfoEncoding_FIRST = PaneInfoEncoding.FIRST
-
-
 class PaneInfoCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees.
 
   Coder for a PaneInfo descriptor."""
 
   def _choose_encoding(self, value):
-    if ((value._index == 0 and value._nonspeculative_index == 0) or
-        value._timing == PaneInfoTiming_UNKNOWN):
-      return PaneInfoEncoding_FIRST
-    elif (value._index == value._nonspeculative_index or
-          value._timing == windowed_value.PaneInfoTiming.EARLY):
+    if ((value.index == 0 and value.nonspeculative_index == 0) or
+        value.timing == windowed_value.PaneInfoTiming.UNKNOWN):
+      return PaneInfoEncoding.FIRST
+    elif (value.index == value.nonspeculative_index or
+          value.timing == windowed_value.PaneInfoTiming.EARLY):
       return PaneInfoEncoding.ONE_INDEX
     else:
       return PaneInfoEncoding.TWO_INDICES
 
   def encode_to_stream(self, value, out, nested):
-    pane_info = value  # cast
-    encoding_type = self._choose_encoding(pane_info)
-    out.write_byte(pane_info._encoded_byte | (encoding_type << 4))
-    if encoding_type == PaneInfoEncoding_FIRST:
+    encoding_type = self._choose_encoding(value)
+    out.write_byte(value.encoded_byte | (encoding_type << 4))
+    if encoding_type == PaneInfoEncoding.FIRST:
       return
     elif encoding_type == PaneInfoEncoding.ONE_INDEX:
       out.write_var_int64(value.index)
@@ -794,7 +772,7 @@ class PaneInfoCoderImpl(StreamCoderImpl):
     base = windowed_value._BYTE_TO_PANE_INFO[encoded_first_byte & 0xF]
     assert base is not None
     encoding_type = encoded_first_byte >> 4
-    if encoding_type == PaneInfoEncoding_FIRST:
+    if encoding_type == PaneInfoEncoding.FIRST:
       return base
     elif encoding_type == PaneInfoEncoding.ONE_INDEX:
       index = in_stream.read_var_int64()
@@ -833,11 +811,11 @@ class WindowedValueCoderImpl(StreamCoderImpl):
   # byte representation of timestamps.
   def _to_normal_time(self, value):
     """Convert "lexicographically ordered unsigned" to signed."""
-    return value - _TIME_SHIFT
+    return value - (1 << 63)
 
   def _from_normal_time(self, value):
     """Convert signed to "lexicographically ordered unsigned"."""
-    return value + _TIME_SHIFT
+    return value + (1 << 63)
 
   def __init__(self, value_coder, timestamp_coder, window_coder):
     # TODO(lcwik): Remove the timestamp coder field
@@ -871,12 +849,16 @@ class WindowedValueCoderImpl(StreamCoderImpl):
     # were indeed MIN/MAX timestamps.
     # TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
     # precision of timestamps.
-    if timestamp <= -(abs(MIN_TIMESTAMP_micros) // 1000):
-      timestamp = MIN_TIMESTAMP_micros
-    elif timestamp >= MAX_TIMESTAMP_micros // 1000:
-      timestamp = MAX_TIMESTAMP_micros
+    if timestamp == -(abs(MIN_TIMESTAMP.micros) // 1000):
+      timestamp = MIN_TIMESTAMP.micros
+    elif timestamp == (MAX_TIMESTAMP.micros // 1000):
+      timestamp = MAX_TIMESTAMP.micros
     else:
       timestamp *= 1000
+      if timestamp > MAX_TIMESTAMP.micros:
+        timestamp = MAX_TIMESTAMP.micros
+      if timestamp < MIN_TIMESTAMP.micros:
+        timestamp = MIN_TIMESTAMP.micros
 
     windows = self._windows_coder.decode_from_stream(in_stream, True)
     # Read PaneInfo encoded byte.
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index f5c90a8..a17bb08 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -995,7 +995,7 @@ class LengthPrefixCoder(FastCoder):
     self._value_coder = value_coder
 
   def _create_impl(self):
-    return coder_impl.LengthPrefixCoderImpl(self._value_coder.get_impl())
+    return coder_impl.LengthPrefixCoderImpl(self._value_coder)
 
   def is_deterministic(self):
     return self._value_coder.is_deterministic()
diff --git a/sdks/python/apache_beam/tools/coders_microbenchmark.py b/sdks/python/apache_beam/tools/coders_microbenchmark.py
index 695bdd1..9453d61 100644
--- a/sdks/python/apache_beam/tools/coders_microbenchmark.py
+++ b/sdks/python/apache_beam/tools/coders_microbenchmark.py
@@ -31,9 +31,7 @@ Run as:
 from __future__ import absolute_import
 from __future__ import print_function
 
-import argparse
 import random
-import re
 import string
 import sys
 
@@ -104,8 +102,7 @@ def small_list():
 
 
 def large_list():
-  # Bool is the last item in FastPrimitiveCoders before pickle.
-  return [bool(k) for k in list_int(1000)]
+  return list_int(1000)
 
 
 def small_tuple():
@@ -125,19 +122,6 @@ def large_dict():
   return {i: i for i in large_list()}
 
 
-def large_iterable():
-  yield 'a' * coders.coder_impl.SequenceCoderImpl._DEFAULT_BUFFER_SIZE
-  for k in range(1000):
-    yield k
-
-
-def globally_windowed_value():
-  return windowed_value.WindowedValue(
-      value=small_int(),
-      timestamp=12345678,
-      windows=(window.GlobalWindow(),))
-
-
 def random_windowed_value(num_windows):
   return windowed_value.WindowedValue(
       value=small_int(),
@@ -156,8 +140,7 @@ def wv_with_multiple_windows():
   return random_windowed_value(num_windows=32)
 
 
-def run_coder_benchmarks(
-    num_runs, input_size, seed, verbose, filter_regex='.*'):
+def run_coder_benchmarks(num_runs, input_size, seed, verbose):
   random.seed(seed)
 
   # TODO(BEAM-4441): Pick coders using type hints, for example:
@@ -184,9 +167,6 @@ def run_coder_benchmarks(
           coders.IterableCoder(coders.FastPrimitivesCoder()),
           large_list),
       coder_benchmark_factory(
-          coders.IterableCoder(coders.FastPrimitivesCoder()),
-          large_iterable),
-      coder_benchmark_factory(
           coders.FastPrimitivesCoder(),
           small_tuple),
       coder_benchmark_factory(
@@ -202,38 +182,20 @@ def run_coder_benchmarks(
           coders.WindowedValueCoder(coders.FastPrimitivesCoder()),
           wv_with_one_window),
       coder_benchmark_factory(
-          coders.WindowedValueCoder(coders.FastPrimitivesCoder(),
-                                    coders.IntervalWindowCoder()),
+          coders.WindowedValueCoder(coders.FastPrimitivesCoder()),
           wv_with_multiple_windows),
-      coder_benchmark_factory(
-          coders.WindowedValueCoder(coders.FastPrimitivesCoder(),
-                                    coders.GlobalWindowCoder()),
-          globally_windowed_value),
-      coder_benchmark_factory(
-          coders.LengthPrefixCoder(coders.FastPrimitivesCoder()),
-          small_int)
   ]
 
-  suite = [utils.BenchmarkConfig(b, input_size, num_runs) for b in benchmarks
-           if re.search(filter_regex, b.__name__, flags=re.I)]
+  suite = [utils.BenchmarkConfig(b, input_size, num_runs) for b in benchmarks]
   utils.run_benchmarks(suite, verbose=verbose)
 
 
 if __name__ == "__main__":
-
-  parser = argparse.ArgumentParser()
-  parser.add_argument('--filter', default='.*')
-  parser.add_argument('--num_runs', default=20, type=int)
-  parser.add_argument('--num_elements_per_benchmark', default=1000, type=int)
-  parser.add_argument('--seed', default=42, type=int)
-  options = parser.parse_args()
-
   utils.check_compiled("apache_beam.coders.coder_impl")
 
   num_runs = 20
   num_elements_per_benchmark = 1000
   seed = 42  # Fix the seed for better consistency
 
-  run_coder_benchmarks(
-      options.num_runs, options.num_elements_per_benchmark, options.seed,
-      verbose=True, filter_regex=options.filter)
+  run_coder_benchmarks(num_runs, num_elements_per_benchmark, seed,
+                       verbose=True)
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 1990532..0970a28 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -66,7 +66,6 @@ from apache_beam.portability.api import standard_window_fns_pb2
 from apache_beam.transforms import timeutil
 from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
-from apache_beam.utils import windowed_value
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.utils.timestamp import Duration
 from apache_beam.utils.timestamp import Timestamp
@@ -226,18 +225,31 @@ class BoundedWindow(object):
     return '[?, %s)' % float(self.end)
 
 
-@total_ordering
-class IntervalWindow(windowed_value._IntervalWindowBase, BoundedWindow):
+class IntervalWindow(BoundedWindow):
   """A window for timestamps in range [start, end).
 
   Attributes:
     start: Start of window as seconds since Unix epoch.
     end: End of window as seconds since Unix epoch.
   """
-  def __lt__(self, other):
-    if self.end != other.end:
-      return self.end < other.end
-    return hash(self) < hash(other)
+
+  def __init__(self, start, end):
+    super(IntervalWindow, self).__init__(end)
+    self.start = Timestamp.of(start)
+
+  def __hash__(self):
+    return hash((self.start, self.end))
+
+  def __eq__(self, other):
+    return (self.start == other.start
+            and self.end == other.end
+            and type(self) == type(other))
+
+  def __ne__(self, other):
+    return not self == other
+
+  def __repr__(self):
+    return '[%s, %s)' % (float(self.start), float(self.end))
 
   def intersects(self, other):
     return other.start < self.end or self.start < other.end
diff --git a/sdks/python/apache_beam/utils/windowed_value.pxd b/sdks/python/apache_beam/utils/windowed_value.pxd
index 9c042e7..8755c93 100644
--- a/sdks/python/apache_beam/utils/windowed_value.pxd
+++ b/sdks/python/apache_beam/utils/windowed_value.pxd
@@ -21,18 +21,6 @@ from libc.stdint cimport int64_t
 
 cdef type Timestamp
 
-
-cdef list _BYTE_TO_PANE_INFO
-
-@cython.final
-cdef class PaneInfo(object):
-  cdef readonly bint _is_first
-  cdef readonly bint _is_last
-  cdef readonly int _timing
-  cdef readonly int _index
-  cdef readonly int _nonspeculative_index
-  cdef readonly unsigned char _encoded_byte
-
 @cython.final
 cdef class WindowedValue(object):
   cdef public object value
@@ -46,10 +34,3 @@ cdef class WindowedValue(object):
 @cython.locals(wv=WindowedValue)
 cpdef WindowedValue create(
   object value, int64_t timestamp_micros, object windows, object pane_info=*)
-
-
-cdef class _IntervalWindowBase(object):
-  cdef object _start_object
-  cdef int64_t _start_micros
-  cdef object _end_object
-  cdef int64_t _end_micros
diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py
index 17568a3..6498497 100644
--- a/sdks/python/apache_beam/utils/windowed_value.py
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -58,11 +58,11 @@ class PaneInfo(object):
 
   def _get_encoded_byte(self):
     byte = 0
-    if self._is_first:
+    if self.is_first:
       byte |= 1
-    if self._is_last:
+    if self.is_last:
       byte |= 2
-    byte |= self._timing << 2
+    byte |= self.timing << 2
     return byte
 
   @staticmethod
@@ -121,10 +121,6 @@ class PaneInfo(object):
     return hash((self.is_first, self.is_last, self.timing, self.index,
                  self.nonspeculative_index))
 
-  def __reduce__(self):
-    return PaneInfo, (self._is_first, self._is_last, self._timing, self._index,
-                      self._nonspeculative_index)
-
 
 def _construct_well_known_pane_infos():
   pane_infos = []
@@ -237,54 +233,3 @@ except TypeError:
   # the cdef class, but in this case it's OK as it's already present
   # on each instance.
   pass
-
-
-class _IntervalWindowBase(object):
-  """Optimized form of IntervalWindow storing only microseconds for endpoints.
-  """
-
-  def __init__(self, start, end):
-    if start is not None or end is not None:
-      self._start_object = Timestamp.of(start)
-      self._end_object = Timestamp.of(end)
-      try:
-        self._start_micros = self._start_object.micros
-      except OverflowError:
-        self._start_micros = (
-            MIN_TIMESTAMP.micros if self._start_object.micros < 0
-            else MAX_TIMESTAMP.micros)
-      try:
-        self._end_micros = self._end_object.micros
-      except OverflowError:
-        self._end_micros = (
-            MIN_TIMESTAMP.micros if self._end_object.micros < 0
-            else MAX_TIMESTAMP.micros)
-    else:
-      # Micros must be populated elsewhere.
-      self._start_object = self._end_object = None
-
-  @property
-  def start(self):
-    if self._start_object is None:
-      self._start_object = Timestamp(0, self._start_micros)
-    return self._start_object
-
-  @property
-  def end(self):
-    if self._end_object is None:
-      self._end_object = Timestamp(0, self._end_micros)
-    return self._end_object
-
-  def __hash__(self):
-    return hash((self._start_micros, self._end_micros))
-
-  def __eq__(self, other):
-    return (type(self) == type(other)
-            and self._start_micros == other.start.micros
-            and self._end_micros == other.end.micros)
-
-  def __ne__(self, other):
-    return not self == other
-
-  def __repr__(self):
-    return '[%s, %s)' % (float(self.start), float(self.end))
diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh
index 4929c5e..e447d5e 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -160,7 +160,6 @@ ignore_identifiers = [
   'apache_beam.typehints.typehints.CompositeTypeHint',
   'apache_beam.typehints.typehints.TypeConstraint',
   'apache_beam.typehints.typehints.validate_composite_type_param()',
-  'apache_beam.utils.windowed_value._IntervalWindowBase',
 
   # Private classes which are used within the same module
   'WindowedTypeConstraint',  # apache_beam.typehints.typehints