You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/20 18:58:31 UTC

[1/3] incubator-beam git commit: Closes #678

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 221262858 -> a643e2009


Closes #678


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a643e200
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a643e200
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a643e200

Branch: refs/heads/python-sdk
Commit: a643e20090ea2b883aecfc4e475e6b356b1ddfc4
Parents: 2212628 a3b6fe4
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 20 11:58:11 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jul 20 11:58:11 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.pxd   |  14 +-
 sdks/python/apache_beam/coders/coder_impl.py    | 131 ++++++++++++++++++-
 sdks/python/apache_beam/coders/coders.py        |  28 +++-
 .../apache_beam/coders/coders_test_common.py    |  36 +++++
 sdks/python/apache_beam/coders/slow_stream.py   |  15 +++
 sdks/python/apache_beam/coders/stream.pxd       |   2 +
 sdks/python/apache_beam/coders/stream.pyx       |  11 ++
 7 files changed, 233 insertions(+), 4 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Address Robert's comments.

Posted by ro...@apache.org.
Address Robert's comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3b6fe4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3b6fe4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3b6fe4b

Branch: refs/heads/python-sdk
Commit: a3b6fe4b3bffac39841403a3113fb052b8cb39ef
Parents: 48a19eb
Author: Charles Chen <cc...@google.com>
Authored: Tue Jul 19 17:54:56 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jul 20 11:58:11 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.pxd   | 16 ++--
 sdks/python/apache_beam/coders/coder_impl.py    | 90 +++++++++++++++-----
 .../apache_beam/coders/coders_test_common.py    |  2 +
 sdks/python/apache_beam/coders/slow_stream.py   | 15 ++++
 sdks/python/apache_beam/coders/stream.pxd       |  2 +
 sdks/python/apache_beam/coders/stream.pyx       | 11 +++
 6 files changed, 106 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3b6fe4b/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index 05fdc89..2ca6743 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -28,7 +28,7 @@ cimport libc.string
 from .stream cimport InputStream, OutputStream
 
 
-cdef object loads, dumps, create_InputStream, create_OutputStream
+cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size
 # Temporarily untyped to allow monkeypatching on failed import.
 #cdef type WindowedValue
 
@@ -38,8 +38,11 @@ cdef class CoderImpl(object):
   cpdef decode_from_stream(self, InputStream stream, bint nested)
   cpdef bytes encode(self, value)
   cpdef decode(self, bytes encoded)
-  cpdef estimate_size(self, value)
-  cpdef get_estimated_size_and_observables(self, value)
+  cpdef estimate_size(self, value, bint nested=?)
+  @cython.locals(varint_size=int, bits=libc.stdint.uint64_t)
+  @cython.overflowcheck(False)
+  cpdef int _get_nested_size(self, int inner_size, bint nested)
+  cpdef get_estimated_size_and_observables(self, value, bint nested=?)
 
 
 cdef class SimpleCoderImpl(CoderImpl):
@@ -94,9 +97,6 @@ cdef class SingletonCoderImpl(CoderImpl):
   cdef object _value
 
 
-cpdef int ESTIMATED_NESTED_OVERHEAD
-
-
 cdef class AbstractComponentCoderImpl(StreamCoderImpl):
   cdef tuple _coder_impls
 
@@ -109,7 +109,7 @@ cdef class AbstractComponentCoderImpl(StreamCoderImpl):
   cpdef decode_from_stream(self, InputStream stream, bint nested)
 
   @cython.locals(c=CoderImpl)
-  cpdef get_estimated_size_and_observables(self, value)
+  cpdef get_estimated_size_and_observables(self, value, bint nested=?)
 
 
 cdef class TupleCoderImpl(AbstractComponentCoderImpl):
@@ -132,4 +132,4 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl):
   cdef CoderImpl _windows_coder
 
   @cython.locals(c=CoderImpl)
-  cpdef get_estimated_size_and_observables(self, value)
+  cpdef get_estimated_size_and_observables(self, value, bint nested=?)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3b6fe4b/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index d021e3b..0024fd8 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -39,12 +39,17 @@ except ImportError:
   WindowedValue = collections.namedtuple(
       'WindowedValue', ('value', 'timestamp', 'windows'))
 
+
 try:
   from stream import InputStream as create_InputStream
   from stream import OutputStream as create_OutputStream
+  from stream import ByteCountingOutputStream
+  from stream import get_varint_size
 except ImportError:
   from slow_stream import InputStream as create_InputStream
   from slow_stream import OutputStream as create_OutputStream
+  from slow_stream import ByteCountingOutputStream
+  from slow_stream import get_varint_size
 # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
 
 
@@ -66,11 +71,17 @@ class CoderImpl(object):
     """Encodes an object to an unnested string."""
     raise NotImplementedError
 
-  def estimate_size(self, value):
+  def estimate_size(self, value, nested=False):
     """Estimates the encoded size of the given value, in bytes."""
-    return len(self.encode(value))
+    return self._get_nested_size(len(self.encode(value)), nested)
+
+  def _get_nested_size(self, inner_size, nested):
+    if not nested:
+      return inner_size
+    varint_size = get_varint_size(inner_size)
+    return varint_size + inner_size
 
-  def get_estimated_size_and_observables(self, value):
+  def get_estimated_size_and_observables(self, value, nested=False):
     """Returns estimated size of value along with any nested observables.
 
     The list of nested observables is returned as a list of 2-tuples of
@@ -80,12 +91,13 @@ class CoderImpl(object):
 
     Arguments:
       value: the value whose encoded size is to be estimated.
+      nested: whether the value is nested.
 
     Returns:
       The estimated encoded size of the given value and a list of observables
       whose elements are 2-tuples of (obj, coder_impl) as described above.
     """
-    return self.estimate_size(value), []
+    return self.estimate_size(value, nested), []
 
 
 class SimpleCoderImpl(CoderImpl):
@@ -111,6 +123,12 @@ class StreamCoderImpl(CoderImpl):
   def decode(self, encoded):
     return self.decode_from_stream(create_InputStream(encoded), False)
 
+  def estimate_size(self, value, nested=False):
+    """Estimates the encoded size of the given value, in bytes."""
+    out = ByteCountingOutputStream()
+    self.encode_to_stream(value, out, nested)
+    return out.get_count()
+
 
 class CallbackCoderImpl(CoderImpl):
   """A CoderImpl that calls back to the _impl methods on the Coder itself.
@@ -139,8 +157,8 @@ class CallbackCoderImpl(CoderImpl):
   def decode(self, encoded):
     return self._decoder(encoded)
 
-  def estimate_size(self, value):
-    return self._size_estimator(value)
+  def estimate_size(self, value, nested=False):
+    return self._get_nested_size(self._size_estimator(value), nested)
 
 
 class DeterministicPickleCoderImpl(CoderImpl):
@@ -282,8 +300,8 @@ class FloatCoderImpl(StreamCoderImpl):
   def decode_from_stream(self, in_stream, nested):
     return in_stream.read_bigendian_double()
 
-  def estimate_size(self, unused_value):
-    # A double is encoded as 8 bytes.
+  def estimate_size(self, unused_value, nested=False):
+    # A double is encoded as 8 bytes, regardless of nesting.
     return 8
 
 
@@ -298,8 +316,9 @@ class TimestampCoderImpl(StreamCoderImpl):
   def decode_from_stream(self, in_stream, nested):
     return self.timestamp_class(micros=in_stream.read_bigendian_int64())
 
-  def estimate_size(self, unused_value):
-    # A Timestamp is encoded as a 64-bit integer in 8 bytes.
+  def estimate_size(self, unused_value, nested=False):
+    # A Timestamp is encoded as a 64-bit integer in 8 bytes, regardless of
+    # nesting.
     return 8
 
 
@@ -329,6 +348,10 @@ class VarIntCoderImpl(StreamCoderImpl):
         return i
     return StreamCoderImpl.decode(self, encoded)
 
+  def estimate_size(self, value, nested=False):
+    # Note that VarInts are encoded the same way regardless of nesting.
+    return get_varint_size(value)
+
 
 class SingletonCoderImpl(CoderImpl):
   """A coder that always encodes exactly one value."""
@@ -349,10 +372,8 @@ class SingletonCoderImpl(CoderImpl):
   def decode(self, encoded):
     return self._value
 
-
-# Number of bytes of overhead estimated for encoding the nested size of a
-# component as a VarInt64.
-ESTIMATED_NESTED_OVERHEAD = 2
+  def estimate_size(self, value, nested=False):
+    return 0
 
 
 class AbstractComponentCoderImpl(StreamCoderImpl):
@@ -382,13 +403,13 @@ class AbstractComponentCoderImpl(StreamCoderImpl):
     return self._construct_from_components(
         [c.decode_from_stream(in_stream, True) for c in self._coder_impls])
 
-  def estimate_size(self, value):
+  def estimate_size(self, value, nested=False):
     """Estimates the encoded size of the given value, in bytes."""
     estimated_size, _ = (
         self.get_estimated_size_and_observables(value))
     return estimated_size
 
-  def get_estimated_size_and_observables(self, value):
+  def get_estimated_size_and_observables(self, value, nested=False):
     """Returns estimated size of value along with any nested observables."""
     values = self._extract_components(value)
     estimated_size = 0
@@ -400,8 +421,8 @@ class AbstractComponentCoderImpl(StreamCoderImpl):
       else:
         c = self._coder_impls[i]  # type cast
         child_size, child_observables = (
-            c.get_estimated_size_and_observables(child_value))
-        estimated_size += child_size + ESTIMATED_NESTED_OVERHEAD
+            c.get_estimated_size_and_observables(child_value, nested=True))
+        estimated_size += child_size
         observables += child_observables
     return estimated_size, observables
 
@@ -437,6 +458,29 @@ class SequenceCoderImpl(StreamCoderImpl):
         [self._elem_coder.decode_from_stream(in_stream, True)
          for _ in range(size)])
 
+  def estimate_size(self, value, nested=False):
+    """Estimates the encoded size of the given value, in bytes."""
+    estimated_size, _ = (
+        self.get_estimated_size_and_observables(value))
+    return estimated_size
+
+  def get_estimated_size_and_observables(self, value, nested=False):
+    """Returns estimated size of value along with any nested observables."""
+    estimated_size = 0
+    observables = []
+    # Size of 32-bit integer storing number of elements.
+    estimated_size += 4
+    for elem in value:
+      if isinstance(elem, observable.ObservableMixin):
+        observables.append((elem, self._elem_coder))
+      else:
+        child_size, child_observables = (
+            self._elem_coder.get_estimated_size_and_observables(
+                elem, nested=True))
+        estimated_size += child_size
+        observables += child_observables
+    return estimated_size, observables
+
 
 class TupleSequenceCoderImpl(SequenceCoderImpl):
   """A coder for homogeneous tuple objects."""
@@ -464,7 +508,7 @@ class WindowedValueCoderImpl(StreamCoderImpl):
         self._timestamp_coder.decode_from_stream(in_stream, True),
         self._windows_coder.decode_from_stream(in_stream, True))
 
-  def get_estimated_size_and_observables(self, value):
+  def get_estimated_size_and_observables(self, value, nested=False):
     """Returns estimated size of value along with any nested observables."""
     estimated_size = 0
     observables = []
@@ -473,9 +517,11 @@ class WindowedValueCoderImpl(StreamCoderImpl):
     else:
       c = self._value_coder  # type cast
       value_estimated_size, value_observables = (
-          c.get_estimated_size_and_observables(value.value))
+          c.get_estimated_size_and_observables(value.value, nested=True))
       estimated_size += value_estimated_size
       observables += value_observables
-    estimated_size += self._timestamp_coder.estimate_size(value.timestamp)
-    estimated_size += self._windows_coder.estimate_size(value.windows)
+    estimated_size += (
+        self._timestamp_coder.estimate_size(value.timestamp, nested=True))
+    estimated_size += (
+        self._windows_coder.estimate_size(value.windows, nested=True))
     return estimated_size, observables

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3b6fe4b/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index d55e8c2..dd4a873 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -81,6 +81,8 @@ class CodersTest(unittest.TestCase):
     for v in values:
       self.assertEqual(v, coder.decode(coder.encode(v)))
       self.assertEqual(coder.estimate_size(v),
+                       len(coder.encode(v)))
+      self.assertEqual(coder.estimate_size(v),
                        coder.get_impl().estimate_size(v))
       self.assertEqual(coder.get_impl().get_estimated_size_and_observables(v),
                        (coder.get_impl().estimate_size(v), []))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3b6fe4b/sdks/python/apache_beam/coders/slow_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py
index 71d5e08..ceb1f0d 100644
--- a/sdks/python/apache_beam/coders/slow_stream.py
+++ b/sdks/python/apache_beam/coders/slow_stream.py
@@ -137,3 +137,18 @@ class InputStream(object):
 
   def read_bigendian_double(self):
     return struct.unpack('>d', self.read(8))[0]
+
+
+def get_varint_size(v):
+  """Returns the size of the given integer value when encode as a VarInt."""
+  if v < 0:
+    v += 1 << 64
+    if v <= 0:
+      raise ValueError('Value too large (negative).')
+  varint_size = 0
+  while True:
+    varint_size += 1
+    v >>= 7
+    if not v:
+      break
+  return varint_size

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3b6fe4b/sdks/python/apache_beam/coders/stream.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd
index eb39bdf..16ea5d4 100644
--- a/sdks/python/apache_beam/coders/stream.pxd
+++ b/sdks/python/apache_beam/coders/stream.pxd
@@ -59,3 +59,5 @@ cdef class InputStream(object):
   cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1
   cpdef double read_bigendian_double(self) except? -1
   cpdef bytes read_all(self, bint nested=*)
+
+cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3b6fe4b/sdks/python/apache_beam/coders/stream.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx
index 1add4cb..cde900f 100644
--- a/sdks/python/apache_beam/coders/stream.pyx
+++ b/sdks/python/apache_beam/coders/stream.pyx
@@ -202,3 +202,14 @@ cdef class InputStream(object):
   cpdef double read_bigendian_double(self) except? -1:
     cdef libc.stdint.int64_t as_long = self.read_bigendian_int64()
     return (<double*><char*>&as_long)[0]
+
+cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value):
+  """Returns the size of the given integer value when encode as a VarInt."""
+  cdef libc.stdint.int64_t varint_size = 0
+  cdef libc.stdint.uint64_t bits = value
+  while True:
+    varint_size += 1
+    bits >>= 7
+    if not bits:
+      break
+  return varint_size


[3/3] incubator-beam git commit: Add size-estimation support to Python SDK Coders

Posted by ro...@apache.org.
Add size-estimation support to Python SDK Coders


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/48a19eb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/48a19eb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/48a19eb2

Branch: refs/heads/python-sdk
Commit: 48a19eb2d1db0a878778a822835a9ef5591a8785
Parents: 2212628
Author: Charles Chen <cc...@google.com>
Authored: Mon Jul 18 12:48:11 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jul 20 11:58:11 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.pxd   | 12 +++
 sdks/python/apache_beam/coders/coder_impl.py    | 85 +++++++++++++++++++-
 sdks/python/apache_beam/coders/coders.py        | 28 ++++++-
 .../apache_beam/coders/coders_test_common.py    | 34 ++++++++
 4 files changed, 156 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/48a19eb2/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
index ae1caa4..05fdc89 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -38,6 +38,8 @@ cdef class CoderImpl(object):
   cpdef decode_from_stream(self, InputStream stream, bint nested)
   cpdef bytes encode(self, value)
   cpdef decode(self, bytes encoded)
+  cpdef estimate_size(self, value)
+  cpdef get_estimated_size_and_observables(self, value)
 
 
 cdef class SimpleCoderImpl(CoderImpl):
@@ -51,6 +53,7 @@ cdef class StreamCoderImpl(CoderImpl):
 cdef class CallbackCoderImpl(CoderImpl):
   cdef object _encoder
   cdef object _decoder
+  cdef object _size_estimator
 
 
 cdef class DeterministicPickleCoderImpl(CoderImpl):
@@ -91,6 +94,9 @@ cdef class SingletonCoderImpl(CoderImpl):
   cdef object _value
 
 
+cpdef int ESTIMATED_NESTED_OVERHEAD
+
+
 cdef class AbstractComponentCoderImpl(StreamCoderImpl):
   cdef tuple _coder_impls
 
@@ -102,6 +108,9 @@ cdef class AbstractComponentCoderImpl(StreamCoderImpl):
   @cython.locals(c=CoderImpl)
   cpdef decode_from_stream(self, InputStream stream, bint nested)
 
+  @cython.locals(c=CoderImpl)
+  cpdef get_estimated_size_and_observables(self, value)
+
 
 cdef class TupleCoderImpl(AbstractComponentCoderImpl):
   pass
@@ -121,3 +130,6 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl):
   cdef CoderImpl _value_coder
   cdef CoderImpl _timestamp_coder
   cdef CoderImpl _windows_coder
+
+  @cython.locals(c=CoderImpl)
+  cpdef get_estimated_size_and_observables(self, value)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/48a19eb2/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 433fd81..d021e3b 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -28,6 +28,8 @@ coder_impl.pxd file for type hints.
 import collections
 from types import NoneType
 
+from apache_beam.coders import observable
+
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
@@ -64,6 +66,27 @@ class CoderImpl(object):
     """Encodes an object to an unnested string."""
     raise NotImplementedError
 
+  def estimate_size(self, value):
+    """Estimates the encoded size of the given value, in bytes."""
+    return len(self.encode(value))
+
+  def get_estimated_size_and_observables(self, value):
+    """Returns estimated size of value along with any nested observables.
+
+    The list of nested observables is returned as a list of 2-tuples of
+    (obj, coder_impl), where obj is an instance of observable.ObservableMixin,
+    and coder_impl is the CoderImpl that can be used to encode elements sent by
+    obj to its observers.
+
+    Arguments:
+      value: the value whose encoded size is to be estimated.
+
+    Returns:
+      The estimated encoded size of the given value and a list of observables
+      whose elements are 2-tuples of (obj, coder_impl) as described above.
+    """
+    return self.estimate_size(value), []
+
 
 class SimpleCoderImpl(CoderImpl):
   """Subclass of CoderImpl implementing stream methods using encode/decode."""
@@ -96,9 +119,13 @@ class CallbackCoderImpl(CoderImpl):
   is not overwritten.
   """
 
-  def __init__(self, encoder, decoder):
+  def __init__(self, encoder, decoder, size_estimator=None):
     self._encoder = encoder
     self._decoder = decoder
+    self._size_estimator = size_estimator or self._default_size_estimator
+
+  def _default_size_estimator(self, value):
+    return len(self.encode(value))
 
   def encode_to_stream(self, value, stream, nested):
     return stream.write(self._encoder(value), nested)
@@ -112,6 +139,9 @@ class CallbackCoderImpl(CoderImpl):
   def decode(self, encoded):
     return self._decoder(encoded)
 
+  def estimate_size(self, value):
+    return self._size_estimator(value)
+
 
 class DeterministicPickleCoderImpl(CoderImpl):
 
@@ -252,6 +282,10 @@ class FloatCoderImpl(StreamCoderImpl):
   def decode_from_stream(self, in_stream, nested):
     return in_stream.read_bigendian_double()
 
+  def estimate_size(self, unused_value):
+    # A double is encoded as 8 bytes.
+    return 8
+
 
 class TimestampCoderImpl(StreamCoderImpl):
 
@@ -264,6 +298,10 @@ class TimestampCoderImpl(StreamCoderImpl):
   def decode_from_stream(self, in_stream, nested):
     return self.timestamp_class(micros=in_stream.read_bigendian_int64())
 
+  def estimate_size(self, unused_value):
+    # A Timestamp is encoded as a 64-bit integer in 8 bytes.
+    return 8
+
 
 small_ints = [chr(_) for _ in range(128)]
 
@@ -312,7 +350,13 @@ class SingletonCoderImpl(CoderImpl):
     return self._value
 
 
+# Number of bytes of overhead estimated for encoding the nested size of a
+# component as a VarInt64.
+ESTIMATED_NESTED_OVERHEAD = 2
+
+
 class AbstractComponentCoderImpl(StreamCoderImpl):
+  """CoderImpl for coders that are comprised of several component coders."""
 
   def __init__(self, coder_impls):
     for c in coder_impls:
@@ -338,6 +382,29 @@ class AbstractComponentCoderImpl(StreamCoderImpl):
     return self._construct_from_components(
         [c.decode_from_stream(in_stream, True) for c in self._coder_impls])
 
+  def estimate_size(self, value):
+    """Estimates the encoded size of the given value, in bytes."""
+    estimated_size, _ = (
+        self.get_estimated_size_and_observables(value))
+    return estimated_size
+
+  def get_estimated_size_and_observables(self, value):
+    """Returns estimated size of value along with any nested observables."""
+    values = self._extract_components(value)
+    estimated_size = 0
+    observables = []
+    for i in range(0, len(self._coder_impls)):
+      child_value = values[i]
+      if isinstance(child_value, observable.ObservableMixin):
+        observables.append((child_value, self._coder_impls[i]))
+      else:
+        c = self._coder_impls[i]  # type cast
+        child_size, child_observables = (
+            c.get_estimated_size_and_observables(child_value))
+        estimated_size += child_size + ESTIMATED_NESTED_OVERHEAD
+        observables += child_observables
+    return estimated_size, observables
+
 
 class TupleCoderImpl(AbstractComponentCoderImpl):
   """A coder for tuple objects."""
@@ -396,3 +463,19 @@ class WindowedValueCoderImpl(StreamCoderImpl):
         self._value_coder.decode_from_stream(in_stream, True),
         self._timestamp_coder.decode_from_stream(in_stream, True),
         self._windows_coder.decode_from_stream(in_stream, True))
+
+  def get_estimated_size_and_observables(self, value):
+    """Returns estimated size of value along with any nested observables."""
+    estimated_size = 0
+    observables = []
+    if isinstance(value.value, observable.ObservableMixin):
+      observables.append((value.value, self._value_coder))
+    else:
+      c = self._value_coder  # type cast
+      value_estimated_size, value_observables = (
+          c.get_estimated_size_and_observables(value.value))
+      estimated_size += value_estimated_size
+      observables += value_observables
+    estimated_size += self._timestamp_coder.estimate_size(value.timestamp)
+    estimated_size += self._windows_coder.estimate_size(value.windows)
+    return estimated_size, observables

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/48a19eb2/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index a5ed7f9..11964b0 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -77,6 +77,26 @@ class Coder(object):
     """
     return False
 
+  def estimate_size(self, value):
+    """Estimates the encoded size of the given value, in bytes.
+
+    Dataflow estimates the encoded size of a PCollection processed in a pipeline
+    step by using the estimated size of a random sample of elements in that
+    PCollection.
+
+    The default implementation encodes the given value and returns its byte
+    size.  If a coder can provide a fast estimate of the encoded size of a value
+    (e.g., if the encoding has a fixed size), it can provide its estimate here
+    to improve performance.
+
+    Arguments:
+      value: the value whose encoded size is to be estimated.
+
+    Returns:
+      The estimated encoded size of the given value.
+    """
+    return len(self.encode(value))
+
   # ===========================================================================
   # Methods below are internal SDK details that don't need to be modified for
   # user-defined coders.
@@ -85,7 +105,8 @@ class Coder(object):
   def _create_impl(self):
     """Creates a CoderImpl to do the actual encoding and decoding.
     """
-    return coder_impl.CallbackCoderImpl(self.encode, self.decode)
+    return coder_impl.CallbackCoderImpl(self.encode, self.decode,
+                                        self.estimate_size)
 
   def get_impl(self):
     if not hasattr(self, '_impl'):
@@ -191,7 +212,7 @@ class FastCoder(Coder):
   """Coder subclass used when a (faster) CoderImpl is supplied directly.
 
   The Coder class defines _create_impl in terms of encode() and decode();
-  this class inverts that defining encode() and decode() in terms of
+  this class inverts that by defining encode() and decode() in terms of
   _create_impl().
   """
 
@@ -203,6 +224,9 @@ class FastCoder(Coder):
     """Decodes the given byte string into the corresponding object."""
     return self.get_impl().decode(encoded)
 
+  def estimate_size(self, value):
+    return self.get_impl().estimate_size(value)
+
   def _create_impl(self):
     raise NotImplementedError
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/48a19eb2/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index deb5652..d55e8c2 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -24,6 +24,7 @@ import unittest
 import dill
 
 import coders
+import observable
 
 
 # Defined out of line for picklability.
@@ -79,6 +80,10 @@ class CodersTest(unittest.TestCase):
     self._observe(coder)
     for v in values:
       self.assertEqual(v, coder.decode(coder.encode(v)))
+      self.assertEqual(coder.estimate_size(v),
+                       coder.get_impl().estimate_size(v))
+      self.assertEqual(coder.get_impl().get_estimated_size_and_observables(v),
+                       (coder.get_impl().estimate_size(v), []))
     copy1 = dill.loads(dill.dumps(coder))
     copy2 = dill.loads(dill.dumps(coder))
     for v in values:
@@ -186,6 +191,35 @@ class CodersTest(unittest.TestCase):
   def test_utf8_coder(self):
     self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0')
 
+  def test_nested_observables(self):
+    class FakeObservableIterator(observable.ObservableMixin):
+
+      def __iter__(self):
+        return iter([1, 2, 3])
+
+    # Coder for elements from the observable iterator.
+    iter_coder = coders.VarIntCoder()
+
+    # Test nested WindowedValue observable.
+    coder = coders.WindowedValueCoder(iter_coder)
+    observ = FakeObservableIterator()
+    try:
+      value = coders.coder_impl.WindowedValue(observ)
+    except TypeError:
+      # We are running tests with a fake WindowedValue implementation so as to
+      # not pull in the rest of the SDK.
+      value = coders.coder_impl.WindowedValue(observ, 0, [])
+    self.assertEqual(
+        coder.get_impl().get_estimated_size_and_observables(value)[1],
+        [(observ, iter_coder.get_impl())])
+
+    # Test nested tuple observable.
+    coder = coders.TupleCoder((coders.StrUtf8Coder(), iter_coder))
+    value = (u'123', observ)
+    self.assertEqual(
+        coder.get_impl().get_estimated_size_and_observables(value)[1],
+        [(observ, iter_coder.get_impl())])
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)