You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/05/10 08:45:18 UTC

[1/2] beam git commit: Renames some python classes and functions that were unnecessarily public.

Repository: beam
Updated Branches:
  refs/heads/master b1a2dbeb1 -> 1fafaa846


Renames some python classes and functions that were unnecessarily public.

Adds a note to documentation of classes that are public but should be only used internally by the SDK (non-user facing classes).

Marks some of the modules as experimental.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cc3d07aa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cc3d07aa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cc3d07aa

Branch: refs/heads/master
Commit: cc3d07aadaa88841e039f8bc15b3b979fca77d22
Parents: b1a2dbe
Author: chamikara@google.com <ch...@google.com>
Authored: Tue May 9 19:56:14 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed May 10 01:44:03 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py    | 57 ++++++++++++----
 sdks/python/apache_beam/coders/observable.py    |  4 +-
 sdks/python/apache_beam/coders/slow_stream.py   | 16 +++--
 sdks/python/apache_beam/coders/typecoders.py    |  4 +-
 sdks/python/apache_beam/internal/gcp/auth.py    | 18 +++--
 .../apache_beam/internal/gcp/json_value.py      | 12 +++-
 sdks/python/apache_beam/internal/pickler.py     | 26 ++++---
 sdks/python/apache_beam/internal/util.py        | 16 +++--
 sdks/python/apache_beam/io/concat_source.py     |  8 ++-
 sdks/python/apache_beam/metrics/cells.py        | 32 ++++++---
 sdks/python/apache_beam/metrics/execution.py    |  7 +-
 .../apache_beam/runners/api/beam_fn_api_pb2.py  |  2 +
 .../runners/api/beam_fn_api_pb2_grpc.py         |  2 +
 .../runners/api/beam_runner_api_pb2.py          |  2 +
 sdks/python/apache_beam/runners/common.pxd      |  4 +-
 sdks/python/apache_beam/runners/common.py       | 28 +++++---
 .../runners/dataflow/internal/apiclient.py      |  4 +-
 .../runners/dataflow/internal/dependency.py     | 16 +++--
 .../runners/dataflow/internal/names.py          |  8 ++-
 .../runners/dataflow/native_io/iobase.py        |  2 +
 .../runners/direct/bundle_factory.py            | 16 +++--
 sdks/python/apache_beam/runners/direct/clock.py |  5 +-
 .../consumer_tracking_pipeline_visitor.py       |  4 +-
 .../apache_beam/runners/direct/executor.py      | 63 +++++++++--------
 .../runners/direct/transform_evaluator.py       |  8 ++-
 .../runners/direct/transform_result.py          |  4 +-
 .../runners/direct/watermark_manager.py         | 10 +--
 .../apache_beam/runners/pipeline_context.py     |  4 +-
 .../runners/portability/fn_api_runner.py        |  2 +
 .../portability/maptask_executor_runner.py      |  2 +
 sdks/python/apache_beam/runners/runner.py       |  8 ++-
 .../apache_beam/runners/worker/data_plane.py    |  2 +
 .../apache_beam/runners/worker/log_handler.py   |  2 +
 .../python/apache_beam/runners/worker/logger.py |  2 +
 .../apache_beam/runners/worker/opcounters.py    |  2 +
 .../runners/worker/operation_specs.py           |  2 +
 .../apache_beam/runners/worker/sdk_worker.py    |  4 ++
 .../runners/worker/sdk_worker_main.py           |  2 +
 .../apache_beam/runners/worker/sideinputs.py    |  2 +
 .../runners/worker/statesampler_fake.py         |  2 +
 sdks/python/apache_beam/transforms/core.py      | 12 +++-
 .../python/apache_beam/transforms/ptransform.py | 14 ++--
 sdks/python/apache_beam/transforms/trigger.py   | 72 ++++++++++----------
 43 files changed, 346 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index d56606d..a0496a2 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -50,6 +50,7 @@ except ImportError:
 
 
 class CoderImpl(object):
+  """For internal use only; no backwards-compatibility guarantees."""
 
   def encode_to_stream(self, value, stream, nested):
     """Reads object from potentially-nested encoding in stream."""
@@ -97,7 +98,9 @@ class CoderImpl(object):
 
 
 class SimpleCoderImpl(CoderImpl):
-  """Subclass of CoderImpl implementing stream methods using encode/decode."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Subclass of CoderImpl implementing stream methods using encode/decode."""
 
   def encode_to_stream(self, value, stream, nested):
     """Reads object from potentially-nested encoding in stream."""
@@ -109,7 +112,9 @@ class SimpleCoderImpl(CoderImpl):
 
 
 class StreamCoderImpl(CoderImpl):
-  """Subclass of CoderImpl implementing encode/decode using stream methods."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Subclass of CoderImpl implementing encode/decode using stream methods."""
 
   def encode(self, value):
     out = create_OutputStream()
@@ -127,7 +132,9 @@ class StreamCoderImpl(CoderImpl):
 
 
 class CallbackCoderImpl(CoderImpl):
-  """A CoderImpl that calls back to the _impl methods on the Coder itself.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A CoderImpl that calls back to the _impl methods on the Coder itself.
 
   This is the default implementation used if Coder._get_impl()
   is not overwritten.
@@ -166,6 +173,7 @@ class CallbackCoderImpl(CoderImpl):
 
 
 class DeterministicFastPrimitivesCoderImpl(CoderImpl):
+  """For internal use only; no backwards-compatibility guarantees."""
 
   def __init__(self, coder, step_label):
     self._underlying_coder = coder
@@ -208,6 +216,7 @@ class DeterministicFastPrimitivesCoderImpl(CoderImpl):
 
 
 class ProtoCoderImpl(SimpleCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees."""
 
   def __init__(self, proto_message_type):
     self.proto_message_type = proto_message_type
@@ -235,6 +244,7 @@ SET_TYPE = 8
 
 
 class FastPrimitivesCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees."""
 
   def __init__(self, fallback_coder_impl):
     self.fallback_coder_impl = fallback_coder_impl
@@ -319,7 +329,9 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
 
 
 class BytesCoderImpl(CoderImpl):
-  """A coder for bytes/str objects."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for bytes/str objects."""
 
   def encode_to_stream(self, value, out, nested):
     out.write(value, nested)
@@ -336,6 +348,7 @@ class BytesCoderImpl(CoderImpl):
 
 
 class FloatCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees."""
 
   def encode_to_stream(self, value, out, nested):
     out.write_bigendian_double(value)
@@ -349,6 +362,8 @@ class FloatCoderImpl(StreamCoderImpl):
 
 
 class IntervalWindowCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees."""
+
   # TODO: Fn Harness only supports millis. Is this important enough to fix?
   def _to_normal_time(self, value):
     """Convert "lexicographically ordered unsigned" to signed."""
@@ -379,6 +394,8 @@ class IntervalWindowCoderImpl(StreamCoderImpl):
 
 
 class TimestampCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees."""
+
   def encode_to_stream(self, value, out, nested):
     out.write_bigendian_int64(value.micros)
 
@@ -395,7 +412,9 @@ small_ints = [chr(_) for _ in range(128)]
 
 
 class VarIntCoderImpl(StreamCoderImpl):
-  """A coder for long/int objects."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for long/int objects."""
 
   def encode_to_stream(self, value, out, nested):
     out.write_var_int64(value)
@@ -422,7 +441,9 @@ class VarIntCoderImpl(StreamCoderImpl):
 
 
 class SingletonCoderImpl(CoderImpl):
-  """A coder that always encodes exactly one value."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder that always encodes exactly one value."""
 
   def __init__(self, value):
     self._value = value
@@ -445,7 +466,9 @@ class SingletonCoderImpl(CoderImpl):
 
 
 class AbstractComponentCoderImpl(StreamCoderImpl):
-  """CoderImpl for coders that are comprised of several component coders."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  CoderImpl for coders that are comprised of several component coders."""
 
   def __init__(self, coder_impls):
     for c in coder_impls:
@@ -507,7 +530,9 @@ class TupleCoderImpl(AbstractComponentCoderImpl):
 
 
 class SequenceCoderImpl(StreamCoderImpl):
-  """A coder for sequences.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for sequences.
 
   If the length of the sequence in known we encode the length as a 32 bit
   ``int`` followed by the encoded bytes.
@@ -611,21 +636,27 @@ class SequenceCoderImpl(StreamCoderImpl):
 
 
 class TupleSequenceCoderImpl(SequenceCoderImpl):
-  """A coder for homogeneous tuple objects."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for homogeneous tuple objects."""
 
   def _construct_from_sequence(self, components):
     return tuple(components)
 
 
 class IterableCoderImpl(SequenceCoderImpl):
-  """A coder for homogeneous iterable objects."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for homogeneous iterable objects."""
 
   def _construct_from_sequence(self, components):
     return components
 
 
 class WindowedValueCoderImpl(StreamCoderImpl):
-  """A coder for windowed values."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for windowed values."""
 
   # Ensure that lexicographic ordering of the bytes corresponds to
   # chronological order of timestamps.
@@ -713,7 +744,9 @@ class WindowedValueCoderImpl(StreamCoderImpl):
 
 
 class LengthPrefixCoderImpl(StreamCoderImpl):
-  """Coder which prefixes the length of the encoded object in the stream."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Coder which prefixes the length of the encoded object in the stream."""
 
   def __init__(self, value_coder):
     self._value_coder = value_coder

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/coders/observable.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/observable.py b/sdks/python/apache_beam/coders/observable.py
index f344b5d..5a808d8 100644
--- a/sdks/python/apache_beam/coders/observable.py
+++ b/sdks/python/apache_beam/coders/observable.py
@@ -20,7 +20,9 @@
 
 
 class ObservableMixin(object):
-  """An observable iterable.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  An observable iterable.
 
   Subclasses need to call self.notify_observers with any object yielded.
   """

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/coders/slow_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py
index a87495c..85837bc 100644
--- a/sdks/python/apache_beam/coders/slow_stream.py
+++ b/sdks/python/apache_beam/coders/slow_stream.py
@@ -21,7 +21,9 @@ import struct
 
 
 class OutputStream(object):
-  """A pure Python implementation of stream.OutputStream."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A pure Python implementation of stream.OutputStream."""
 
   def __init__(self):
     self.data = []
@@ -69,7 +71,9 @@ class OutputStream(object):
 
 
 class ByteCountingOutputStream(OutputStream):
-  """A pure Python implementation of stream.ByteCountingOutputStream."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A pure Python implementation of stream.ByteCountingOutputStream."""
 
   def __init__(self):
     # Note that we don't actually use any of the data initialized by our super.
@@ -96,7 +100,9 @@ class ByteCountingOutputStream(OutputStream):
 
 
 class InputStream(object):
-  """A pure Python implementation of stream.InputStream."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A pure Python implementation of stream.InputStream."""
 
   def __init__(self, data):
     self.data = data
@@ -149,7 +155,9 @@ class InputStream(object):
 
 
 def get_varint_size(v):
-  """Returns the size of the given integer value when encode as a VarInt."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Returns the size of the given integer value when encode as a VarInt."""
   if v < 0:
     v += 1 << 64
     if v <= 0:

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/coders/typecoders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py
index 1bd4312..60832c9 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -160,7 +160,9 @@ class CoderRegistry(object):
 
 
 class FirstOf(object):
-  """A class used to get the first matching coder from a list of coders."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A class used to get the first matching coder from a list of coders."""
 
   def __init__(self, coders):
     self._coders = coders

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/internal/gcp/auth.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py
index 3c8dd64..9f32092 100644
--- a/sdks/python/apache_beam/internal/gcp/auth.py
+++ b/sdks/python/apache_beam/internal/gcp/auth.py
@@ -38,7 +38,9 @@ executing_project = None
 
 
 def set_running_in_gce(worker_executing_project):
-  """Informs the authentication library that we are running in GCE.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Informs the authentication library that we are running in GCE.
 
   When we are running in GCE, we have the option of using the VM metadata
   credentials for authentication to Google services.
@@ -57,8 +59,10 @@ class AuthenticationException(retry.PermanentException):
   pass
 
 
-class GCEMetadataCredentials(OAuth2Credentials):
-  """Credential object initialized using access token from GCE VM metadata."""
+class _GCEMetadataCredentials(OAuth2Credentials):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Credential object initialized using access token from GCE VM metadata."""
 
   def __init__(self, user_agent=None):
     """Create an instance of GCEMetadataCredentials.
@@ -69,7 +73,7 @@ class GCEMetadataCredentials(OAuth2Credentials):
     Args:
       user_agent: string, The HTTP User-Agent to provide for this application.
     """
-    super(GCEMetadataCredentials, self).__init__(
+    super(_GCEMetadataCredentials, self).__init__(
         None,  # access_token
         None,  # client_id
         None,  # client_secret
@@ -94,7 +98,9 @@ class GCEMetadataCredentials(OAuth2Credentials):
 
 
 def get_service_credentials():
-  """Get credentials to access Google services."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Get credentials to access Google services."""
   user_agent = 'beam-python-sdk/1.0'
   if is_running_in_gce:
     # We are currently running as a GCE taskrunner worker.
@@ -102,7 +108,7 @@ def get_service_credentials():
     # TODO(ccy): It's not entirely clear if these credentials are thread-safe.
     # If so, we can cache these credentials to save the overhead of creating
     # them again.
-    return GCEMetadataCredentials(user_agent=user_agent)
+    return _GCEMetadataCredentials(user_agent=user_agent)
   else:
     client_scopes = [
         'https://www.googleapis.com/auth/bigquery',

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/internal/gcp/json_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py b/sdks/python/apache_beam/internal/gcp/json_value.py
index 523db9c..59f8b60 100644
--- a/sdks/python/apache_beam/internal/gcp/json_value.py
+++ b/sdks/python/apache_beam/internal/gcp/json_value.py
@@ -33,7 +33,9 @@ _MININT64 = - (1 << 63)
 
 
 def get_typed_value_descriptor(obj):
-  """Converts a basic type into a @type/value dictionary.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Converts a basic type into a @type/value dictionary.
 
   Args:
     obj: A basestring, bool, int, or float to be converted.
@@ -59,7 +61,9 @@ def get_typed_value_descriptor(obj):
 
 
 def to_json_value(obj, with_type=False):
-  """Converts Python objects into extra_types.JsonValue objects.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Converts Python objects into extra_types.JsonValue objects.
 
   Args:
     obj: Python object to be converted. Can be 'None'.
@@ -115,7 +119,9 @@ def to_json_value(obj, with_type=False):
 
 
 def from_json_value(v):
-  """Converts extra_types.JsonValue objects into Python objects.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Converts extra_types.JsonValue objects into Python objects.
 
   Args:
     v: JsonValue object to be converted.

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/internal/pickler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py
index ed429fe..4305379 100644
--- a/sdks/python/apache_beam/internal/pickler.py
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -36,25 +36,25 @@ import zlib
 import dill
 
 
-def is_nested_class(cls):
+def _is_nested_class(cls):
   """Returns true if argument is a class object that appears to be nested."""
   return (isinstance(cls, type)
           and cls.__module__ != '__builtin__'
           and cls.__name__ not in sys.modules[cls.__module__].__dict__)
 
 
-def find_containing_class(nested_class):
+def _find_containing_class(nested_class):
   """Finds containing class of a nestec class passed as argument."""
 
-  def find_containing_class_inner(outer):
+  def _find_containing_class_inner(outer):
     for k, v in outer.__dict__.items():
       if v is nested_class:
         return outer, k
       elif isinstance(v, (type, types.ClassType)) and hasattr(v, '__dict__'):
-        res = find_containing_class_inner(v)
+        res = _find_containing_class_inner(v)
         if res: return res
 
-  return find_containing_class_inner(sys.modules[nested_class.__module__])
+  return _find_containing_class_inner(sys.modules[nested_class.__module__])
 
 
 def _nested_type_wrapper(fun):
@@ -76,8 +76,8 @@ def _nested_type_wrapper(fun):
     # do anything special because the pickler itself will save the constituent
     # parts of the type (i.e., name, base classes, dictionary) and then
     # recreate it during unpickling.
-    if is_nested_class(obj) and obj.__module__ != '__main__':
-      containing_class_and_name = find_containing_class(obj)
+    if _is_nested_class(obj) and obj.__module__ != '__main__':
+      containing_class_and_name = _find_containing_class(obj)
       if containing_class_and_name is not None:
         return pickler.save_reduce(
             getattr, containing_class_and_name, obj=obj)
@@ -108,11 +108,11 @@ dill.dill.Pickler.dispatch[type] = _nested_type_wrapper(
 # Dill pickles generators objects without complaint, but unpickling produces
 # TypeError: object.__new__(generator) is not safe, use generator.__new__()
 # on some versions of Python.
-def reject_generators(unused_pickler, unused_obj):
+def _reject_generators(unused_pickler, unused_obj):
   raise TypeError("can't (safely) pickle generator objects")
 
 
-dill.dill.Pickler.dispatch[types.GeneratorType] = reject_generators
+dill.dill.Pickler.dispatch[types.GeneratorType] = _reject_generators
 
 
 # This if guards against dill not being full initialized when generating docs.
@@ -185,6 +185,8 @@ logging.getLogger('dill').setLevel(logging.WARN)
 # pickler.loads() being used for data, which results in an unnecessary base64
 # encoding.  This should be cleaned up.
 def dumps(o, enable_trace=True):
+  """For internal use only; no backwards-compatibility guarantees."""
+
   try:
     s = dill.dumps(o)
   except Exception:      # pylint: disable=broad-except
@@ -206,6 +208,8 @@ def dumps(o, enable_trace=True):
 
 
 def loads(encoded, enable_trace=True):
+  """For internal use only; no backwards-compatibility guarantees."""
+
   c = base64.b64decode(encoded)
 
   s = zlib.decompress(c)
@@ -224,7 +228,9 @@ def loads(encoded, enable_trace=True):
 
 
 def dump_session(file_path):
-  """Pickle the current python session to be used in the worker.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Pickle the current python session to be used in the worker.
 
   Note: Due to the inconsistency in the first dump of dill dump_session we
   create and load the dump twice to have consistent results in the worker and

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/internal/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py
index 5b31e88..df4878c 100644
--- a/sdks/python/apache_beam/internal/util.py
+++ b/sdks/python/apache_beam/internal/util.py
@@ -24,7 +24,9 @@ import weakref
 
 
 class ArgumentPlaceholder(object):
-  """A place holder object replacing PValues in argument lists.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A place holder object replacing PValues in argument lists.
 
   A Fn object can take any number of "side inputs", which are PValues that will
   be evaluated during pipeline execution and will be provided to the function
@@ -48,7 +50,9 @@ class ArgumentPlaceholder(object):
 
 
 def remove_objects_from_args(args, kwargs, pvalue_classes):
-  """Replaces all objects of a given type in args/kwargs with a placeholder.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Replaces all objects of a given type in args/kwargs with a placeholder.
 
   Args:
     args: A list of positional arguments.
@@ -77,7 +81,9 @@ def remove_objects_from_args(args, kwargs, pvalue_classes):
 
 
 def insert_values_in_args(args, kwargs, values):
-  """Replaces all placeholders in args/kwargs with actual values.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Replaces all placeholders in args/kwargs with actual values.
 
   Args:
     args: A list of positional arguments.
@@ -100,7 +106,9 @@ def insert_values_in_args(args, kwargs, values):
 
 
 def run_using_threadpool(fn_to_execute, inputs, pool_size):
-  """Runs the given function on given inputs using a thread pool.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Runs the given function on given inputs using a thread pool.
 
   Args:
     fn_to_execute: Function to execute

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/io/concat_source.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py
index 1656180..dfd1695 100644
--- a/sdks/python/apache_beam/io/concat_source.py
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -25,7 +25,9 @@ from apache_beam.io import iobase
 
 
 class ConcatSource(iobase.BoundedSource):
-  """A ``BoundedSource`` that can group a set of ``BoundedSources``.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A ``BoundedSource`` that can group a set of ``BoundedSources``.
 
   Primarily for internal use, use the ``apache_beam.Flatten`` transform
   to create the union of several reads.
@@ -89,7 +91,9 @@ class ConcatSource(iobase.BoundedSource):
 
 
 class ConcatRangeTracker(iobase.RangeTracker):
-  """Range tracker for ConcatSource"""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Range tracker for ConcatSource"""
 
   def __init__(self, start, end, source_bundles):
     """Initializes ``ConcatRangeTracker``

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/metrics/cells.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index 41d24be..fbe3ad3 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -31,7 +31,9 @@ from apache_beam.metrics.metricbase import Distribution
 
 
 class CellCommitState(object):
-  """Atomically tracks a cell's dirty/clean commit status.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Atomically tracks a cell's dirty/clean commit status.
 
   Reporting a metric update works in a two-step process: First, updates to the
   metric are received, and the metric is marked as 'dirty'. Later, updates are
@@ -102,7 +104,9 @@ class CellCommitState(object):
 
 
 class MetricCell(object):
-  """Accumulates in-memory changes to a metric.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Accumulates in-memory changes to a metric.
 
   A MetricCell represents a specific metric in a single context and bundle.
   All subclasses must be thread safe, as these are used in the pipeline runners,
@@ -118,7 +122,9 @@ class MetricCell(object):
 
 
 class CounterCell(Counter, MetricCell):
-  """Tracks the current value and delta of a counter metric.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Tracks the current value and delta of a counter metric.
 
   Each cell tracks the state of a metric independently per context per bundle.
   Therefore, each metric has a different cell in each bundle, cells are
@@ -146,7 +152,9 @@ class CounterCell(Counter, MetricCell):
 
 
 class DistributionCell(Distribution, MetricCell):
-  """Tracks the current value and delta for a distribution metric.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Tracks the current value and delta for a distribution metric.
 
   Each cell tracks the state of a metric independently per context per bundle.
   Therefore, each metric has a different cell in each bundle, that is later
@@ -228,7 +236,9 @@ class DistributionResult(object):
 
 
 class DistributionData(object):
-  """The data structure that holds data about a distribution metric.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  The data structure that holds data about a distribution metric.
 
   Distribution metrics are restricted to distributions of integers only.
 
@@ -280,7 +290,9 @@ class DistributionData(object):
 
 
 class MetricAggregator(object):
-  """Base interface for aggregating metric data during pipeline execution."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Base interface for aggregating metric data during pipeline execution."""
   def zero(self):
     raise NotImplementedError
 
@@ -292,7 +304,9 @@ class MetricAggregator(object):
 
 
 class CounterAggregator(MetricAggregator):
-  """Aggregator for Counter metric data during pipeline execution.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Aggregator for Counter metric data during pipeline execution.
 
   Values aggregated should be ``int`` objects.
   """
@@ -307,7 +321,9 @@ class CounterAggregator(MetricAggregator):
 
 
 class DistributionAggregator(MetricAggregator):
-  """Aggregator for Distribution metric data during pipeline execution.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Aggregator for Distribution metric data during pipeline execution.
 
   Values aggregated should be ``DistributionData`` objects.
   """

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index dbd0533..a06ec0c 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -16,7 +16,7 @@
 #
 
 """
-Internal classes for Metrics API.
+This module is for internal use only; no backwards-compatibility guarantees.
 
 The classes in this file keep shared state, and organize metrics information.
 
@@ -36,7 +36,9 @@ from apache_beam.metrics.cells import CounterCell, DistributionCell
 
 
 class MetricKey(object):
-  """Key used to identify instance of metric cell.
+  """
+
+  Key used to identify instance of metric cell.
 
   Metrics are internally keyed by the step name they associated with and
   the name of the metric.
@@ -193,6 +195,7 @@ class MetricsContainer(object):
 
 
 class ScopedMetricsContainer(object):
+
   def __init__(self, container=None):
     self._stack = MetricsEnvironment.container_stack()
     self._container = container

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/api/beam_fn_api_pb2.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/beam_fn_api_pb2.py b/sdks/python/apache_beam/runners/api/beam_fn_api_pb2.py
index 769b13c..cb0b72b 100644
--- a/sdks/python/apache_beam/runners/api/beam_fn_api_pb2.py
+++ b/sdks/python/apache_beam/runners/api/beam_fn_api_pb2.py
@@ -33,6 +33,8 @@ _sym_db = _symbol_database.Default()
 from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
 from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 DESCRIPTOR = _descriptor.FileDescriptor(
   name='beam_fn_api.proto',

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/api/beam_fn_api_pb2_grpc.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/beam_fn_api_pb2_grpc.py b/sdks/python/apache_beam/runners/api/beam_fn_api_pb2_grpc.py
index 7fbe4c1..08d7dad 100644
--- a/sdks/python/apache_beam/runners/api/beam_fn_api_pb2_grpc.py
+++ b/sdks/python/apache_beam/runners/api/beam_fn_api_pb2_grpc.py
@@ -22,6 +22,8 @@ from grpc.framework.interfaces.face import utilities as face_utilities
 
 import beam_fn_api_pb2 as beam__fn__api__pb2
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 class BeamFnControlStub(object):
   """

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
index d2006d7..e8793b6 100644
--- a/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
+++ b/sdks/python/apache_beam/runners/api/beam_runner_api_pb2.py
@@ -33,6 +33,8 @@ _sym_db = _symbol_database.Default()
 
 from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 DESCRIPTOR = _descriptor.FileDescriptor(
   name='beam_runner_api.proto',

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 53f7aa8..dcfac2e 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -43,7 +43,7 @@ cdef class DoFnSignature(object):
 
 cdef class DoFnInvoker(object):
   cdef public DoFnSignature signature
-  cdef OutputProcessor output_processor
+  cdef _OutputProcessor output_processor
 
   cpdef invoke_process(self, WindowedValue windowed_value)
   cpdef invoke_start_bundle(self)
@@ -77,7 +77,7 @@ cdef class DoFnRunner(Receiver):
   cpdef process(self, WindowedValue windowed_value)
 
 
-cdef class OutputProcessor(object):
+cdef class _OutputProcessor(object):
   cdef object window_fn
   cdef Receiver main_receivers
   cdef object tagged_receivers

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index ec1f5dc..0aef0a1 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -32,6 +32,7 @@ from apache_beam.utils.windowed_value import WindowedValue
 
 
 class LoggingContext(object):
+  """For internal use only; no backwards-compatibility guarantees."""
 
   def enter(self):
     pass
@@ -41,7 +42,9 @@ class LoggingContext(object):
 
 
 class Receiver(object):
-  """An object that consumes a WindowedValue.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  An object that consumes a WindowedValue.
 
   This class can be efficiently used to pass values between the
   sdk and worker harnesses.
@@ -52,7 +55,9 @@ class Receiver(object):
 
 
 class DoFnMethodWrapper(object):
-  """Represents a method of a DoFn object."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Represents a method of a DoFn object."""
 
   def __init__(self, do_fn, method_name):
     """
@@ -299,7 +304,9 @@ class PerWindowInvoker(DoFnInvoker):
 
 
 class DoFnRunner(Receiver):
-  """A helper class for executing ParDo operations.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A helper class for executing ParDo operations.
   """
 
   def __init__(self,
@@ -361,7 +368,7 @@ class DoFnRunner(Receiver):
 
     # Optimize for the common case.
     main_receivers = as_receiver(tagged_receivers[None])
-    output_processor = OutputProcessor(
+    output_processor = _OutputProcessor(
         windowing.windowfn, main_receivers, tagged_receivers)
 
     self.do_fn_invoker = DoFnInvoker.create_invoker(
@@ -411,11 +418,11 @@ class DoFnRunner(Receiver):
       raise
 
 
-class OutputProcessor(object):
+class _OutputProcessor(object):
   """Processes output produced by DoFn method invocations."""
 
   def __init__(self, window_fn, main_receivers, tagged_receivers):
-    """Initializes ``OutputProcessor``.
+    """Initializes ``_OutputProcessor``.
 
     Args:
       window_fn: a windowing function (WindowFn).
@@ -497,7 +504,7 @@ class OutputProcessor(object):
         self.tagged_receivers[tag].output(windowed_value)
 
 
-class NoContext(WindowFn.AssignContext):
+class _NoContext(WindowFn.AssignContext):
   """An uninspectable WindowFn.AssignContext."""
   NO_VALUE = object()
 
@@ -518,7 +525,9 @@ class NoContext(WindowFn.AssignContext):
 
 
 class DoFnState(object):
-  """Keeps track of state that DoFns want, currently, user counters.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Keeps track of state that DoFns want, currently, user counters.
   """
 
   def __init__(self, counter_factory):
@@ -533,6 +542,7 @@ class DoFnState(object):
 
 # TODO(robertwb): Replace core.DoFnContext with this.
 class DoFnContext(object):
+  """For internal use only; no backwards-compatibility guarantees."""
 
   def __init__(self, label, element=None, state=None):
     self.label = label
@@ -597,6 +607,8 @@ class _ReceiverAdapter(Receiver):
 
 
 def as_receiver(maybe_receiver):
+  """For internal use only; no backwards-compatibility guarantees."""
+
   if isinstance(maybe_receiver, Receiver):
     return maybe_receiver
   return _ReceiverAdapter(maybe_receiver)

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 220ceb8..ea49593 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -15,7 +15,9 @@
 # limitations under the License.
 #
 
-"""Dataflow client utility functions."""
+""" For internal use only. No backwards compatibility guarantees.
+
+Dataflow client utility functions."""
 
 import codecs
 import getpass

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 63e593c..892d9f9 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -244,7 +244,9 @@ def _populate_requirements_cache(requirements_file, cache_dir):
 def stage_job_resources(
     options, file_copy=_dependency_file_copy, build_setup_args=None,
     temp_dir=None, populate_requirements_cache=_populate_requirements_cache):
-  """Creates (if needed) and stages job resources to options.staging_location.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Creates (if needed) and stages job resources to options.staging_location.
 
   Args:
     options: Command line options. More specifically the function will expect
@@ -467,7 +469,9 @@ def _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
 
 
 def get_required_container_version():
-  """Returns the Google Cloud Dataflow container version for remote execution.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Returns the Google Cloud Dataflow container version for remote execution.
   """
   # TODO(silviuc): Handle apache-beam versions when we have official releases.
   import pkg_resources as pkg
@@ -487,7 +491,9 @@ def get_required_container_version():
 
 
 def get_sdk_name_and_version():
-  """Returns name and version of SDK reported to Google Cloud Dataflow."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Returns name and version of SDK reported to Google Cloud Dataflow."""
   # TODO(ccy): Make this check cleaner.
   container_version = get_required_container_version()
   if container_version == BEAM_CONTAINER_VERSION:
@@ -496,7 +502,9 @@ def get_sdk_name_and_version():
 
 
 def get_sdk_package_name():
-  """Returns the PyPI package name to be staged to Google Cloud Dataflow."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Returns the PyPI package name to be staged to Google Cloud Dataflow."""
   container_version = get_required_container_version()
   if container_version == BEAM_CONTAINER_VERSION:
     return BEAM_PACKAGE_NAME

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/dataflow/internal/names.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 182f27e..be67224 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -29,7 +29,9 @@ SERIALIZED_SOURCE_KEY = 'serialized_source'
 
 
 class TransformNames(object):
-  """Transform strings as they are expected in the CloudWorkflow protos."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Transform strings as they are expected in the CloudWorkflow protos."""
   COLLECTION_TO_SINGLETON = 'CollectionToSingleton'
   COMBINE = 'CombineValues'
   CREATE_PCOLLECTION = 'CreateCollection'
@@ -41,7 +43,9 @@ class TransformNames(object):
 
 
 class PropertyNames(object):
-  """Property strings as they are expected in the CloudWorkflow protos."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Property strings as they are expected in the CloudWorkflow protos."""
   BIGQUERY_CREATE_DISPOSITION = 'create_disposition'
   BIGQUERY_DATASET = 'dataset'
   BIGQUERY_QUERY = 'bigquery_query'

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
index 188b4af..c1f4238 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
@@ -16,6 +16,8 @@
 #
 
 """Dataflow native sources and sinks.
+
+For internal use only; no backwards-compatibility guarantees.
 """
 
 import logging

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/direct/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index 42c8095..ed00b03 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -24,7 +24,9 @@ from apache_beam.utils.windowed_value import WindowedValue
 
 
 class BundleFactory(object):
-  """BundleFactory creates output bundles to be used by transform evaluators.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  BundleFactory creates output bundles to be used by transform evaluators.
 
   Args:
     stacked: whether or not to stack the WindowedValues within the bundle
@@ -36,7 +38,7 @@ class BundleFactory(object):
     self._stacked = stacked
 
   def create_bundle(self, output_pcollection):
-    return Bundle(output_pcollection, self._stacked)
+    return _Bundle(output_pcollection, self._stacked)
 
   def create_empty_committed_bundle(self, output_pcollection):
     bundle = self.create_bundle(output_pcollection)
@@ -45,7 +47,7 @@ class BundleFactory(object):
 
 
 # a bundle represents a unit of work that will be processed by a transform.
-class Bundle(object):
+class _Bundle(object):
   """Part of a PCollection with output elements.
 
   Part of a PCollection. Elements are output to a bundle, which will cause them
@@ -67,7 +69,7 @@ class Bundle(object):
     b = Bundle(stacked=False)
   """
 
-  class StackedWindowedValues(object):
+  class _StackedWindowedValues(object):
     """A stack of WindowedValues with the same timestamp and windows.
 
     It must be initialized from a single WindowedValue.
@@ -131,7 +133,7 @@ class Bundle(object):
 
     def iterable_stacked_or_elements(elements):
       for e in elements:
-        if isinstance(e, Bundle.StackedWindowedValues):
+        if isinstance(e, _Bundle._StackedWindowedValues):
           for w in e.windowed_values():
             yield w
         else:
@@ -171,11 +173,11 @@ class Bundle(object):
       return
     if (self._elements and
         (isinstance(self._elements[-1], (WindowedValue,
-                                         Bundle.StackedWindowedValues))) and
+                                         _Bundle._StackedWindowedValues))) and
         self._elements[-1].timestamp == element.timestamp and
         self._elements[-1].windows == element.windows):
       if isinstance(self._elements[-1], WindowedValue):
-        self._elements[-1] = Bundle.StackedWindowedValues(self._elements[-1])
+        self._elements[-1] = _Bundle._StackedWindowedValues(self._elements[-1])
       self._elements[-1].add_value(element.value)
     else:
       self._elements.append(element)

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/direct/clock.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/clock.py b/sdks/python/apache_beam/runners/direct/clock.py
index dd1800a..84d52f7 100644
--- a/sdks/python/apache_beam/runners/direct/clock.py
+++ b/sdks/python/apache_beam/runners/direct/clock.py
@@ -23,6 +23,7 @@ import time
 
 
 class Clock(object):
+  """For internal use only; no backwards-compatibility guarantees."""
 
   def time(self):
     """Returns the number of milliseconds since epoch."""
@@ -30,7 +31,9 @@ class Clock(object):
 
 
 class MockClock(Clock):
-  """Mock clock implementation for testing."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Mock clock implementation for testing."""
 
   def __init__(self, now_in_ms):
     self._now_in_ms = now_in_ms

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
index cdfadb7..d625d3c 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
@@ -24,7 +24,9 @@ from apache_beam.pipeline import PipelineVisitor
 
 
 class ConsumerTrackingPipelineVisitor(PipelineVisitor):
-  """Visitor for extracting value-consumer relations from the graph.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Visitor for extracting value-consumer relations from the graph.
 
   Tracks the AppliedPTransforms that consume each PValue in the Pipeline. This
   is used to schedule consuming PTransforms to consume input after the upstream

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 0aa1aec..9efbede 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -30,7 +30,7 @@ from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.metrics.execution import ScopedMetricsContainer
 
 
-class ExecutorService(object):
+class _ExecutorService(object):
   """Thread pool for executing tasks in parallel."""
 
   class CallableTask(object):
@@ -42,14 +42,14 @@ class ExecutorService(object):
     def name(self):
       return None
 
-  class ExecutorServiceWorker(threading.Thread):
+  class _ExecutorServiceWorker(threading.Thread):
     """Worker thread for executing a single task at a time."""
 
     # Amount to block waiting for getting an item from the queue in seconds.
     TIMEOUT = 5
 
     def __init__(self, queue, index):
-      super(ExecutorService.ExecutorServiceWorker, self).__init__()
+      super(_ExecutorService._ExecutorServiceWorker, self).__init__()
       self.queue = queue
       self._index = index
       self._default_name = 'ExecutorServiceWorker-' + str(index)
@@ -70,7 +70,7 @@ class ExecutorService(object):
         # Do not block indefinitely, otherwise we may not act for a requested
         # shutdown.
         return self.queue.get(
-            timeout=ExecutorService.ExecutorServiceWorker.TIMEOUT)
+            timeout=_ExecutorService._ExecutorServiceWorker.TIMEOUT)
       except Queue.Empty:
         return None
 
@@ -92,12 +92,12 @@ class ExecutorService(object):
 
   def __init__(self, num_workers):
     self.queue = Queue.Queue()
-    self.workers = [ExecutorService.ExecutorServiceWorker(
+    self.workers = [_ExecutorService._ExecutorServiceWorker(
         self.queue, i) for i in range(num_workers)]
     self.shutdown_requested = False
 
   def submit(self, task):
-    assert isinstance(task, ExecutorService.CallableTask)
+    assert isinstance(task, _ExecutorService.CallableTask)
     if not self.shutdown_requested:
       self.queue.put(task)
 
@@ -122,7 +122,7 @@ class ExecutorService(object):
     # last task).
 
 
-class TransformEvaluationState(object):
+class _TransformEvaluationState(object):
 
   def __init__(self, executor_service, scheduled):
     self.executor_service = executor_service
@@ -136,7 +136,7 @@ class TransformEvaluationState(object):
     self.scheduled.remove(completed_work)
 
 
-class ParallelEvaluationState(TransformEvaluationState):
+class _ParallelEvaluationState(_TransformEvaluationState):
   """A TransformEvaluationState with unlimited parallelism.
 
   Any TransformExecutor scheduled will be immediately submitted to the
@@ -148,7 +148,7 @@ class ParallelEvaluationState(TransformEvaluationState):
   pass
 
 
-class SerialEvaluationState(TransformEvaluationState):
+class _SerialEvaluationState(_TransformEvaluationState):
   """A TransformEvaluationState with a single work queue.
 
   Any TransformExecutor scheduled will be placed on the work queue. Only one
@@ -159,14 +159,14 @@ class SerialEvaluationState(TransformEvaluationState):
   """
 
   def __init__(self, executor_service, scheduled):
-    super(SerialEvaluationState, self).__init__(executor_service, scheduled)
+    super(_SerialEvaluationState, self).__init__(executor_service, scheduled)
     self.serial_queue = collections.deque()
     self.currently_evaluating = None
     self._lock = threading.Lock()
 
   def complete(self, completed_work):
     self._update_currently_evaluating(None, completed_work)
-    super(SerialEvaluationState, self).complete(completed_work)
+    super(_SerialEvaluationState, self).complete(completed_work)
 
   def schedule(self, new_work):
     self._update_currently_evaluating(new_work, None)
@@ -181,10 +181,10 @@ class SerialEvaluationState(TransformEvaluationState):
       if self.serial_queue and not self.currently_evaluating:
         next_work = self.serial_queue.pop()
         self.currently_evaluating = next_work
-        super(SerialEvaluationState, self).schedule(next_work)
+        super(_SerialEvaluationState, self).schedule(next_work)
 
 
-class TransformExecutorServices(object):
+class _TransformExecutorServices(object):
   """Schedules and completes TransformExecutors.
 
   Controls the concurrency as appropriate for the applied transform the executor
@@ -194,7 +194,7 @@ class TransformExecutorServices(object):
   def __init__(self, executor_service):
     self._executor_service = executor_service
     self._scheduled = set()
-    self._parallel = ParallelEvaluationState(
+    self._parallel = _ParallelEvaluationState(
         self._executor_service, self._scheduled)
     self._serial_cache = WeakValueDictionary()
 
@@ -204,7 +204,7 @@ class TransformExecutorServices(object):
   def serial(self, step):
     cached = self._serial_cache.get(step)
     if not cached:
-      cached = SerialEvaluationState(self._executor_service, self._scheduled)
+      cached = _SerialEvaluationState(self._executor_service, self._scheduled)
       self._serial_cache[step] = cached
     return cached
 
@@ -230,17 +230,19 @@ class _CompletionCallback(object):
     output_committed_bundles = self._evaluation_context.handle_result(
         input_committed_bundle, self._timers, transform_result)
     for output_committed_bundle in output_committed_bundles:
-      self._all_updates.offer(_ExecutorServiceParallelExecutor.ExecutorUpdate(
+      self._all_updates.offer(_ExecutorServiceParallelExecutor._ExecutorUpdate(
           output_committed_bundle, None))
     return output_committed_bundles
 
   def handle_exception(self, exception):
     self._all_updates.offer(
-        _ExecutorServiceParallelExecutor.ExecutorUpdate(None, exception))
+        _ExecutorServiceParallelExecutor._ExecutorUpdate(None, exception))
 
 
-class TransformExecutor(ExecutorService.CallableTask):
-  """TransformExecutor will evaluate a bundle using an applied ptransform.
+class TransformExecutor(_ExecutorService.CallableTask):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  TransformExecutor will evaluate a bundle using an applied ptransform.
 
   A CallableTask responsible for constructing a TransformEvaluator and
   evaluating it on some bundle of input, and registering the result using the
@@ -316,6 +318,7 @@ class TransformExecutor(ExecutorService.CallableTask):
 
 
 class Executor(object):
+  """For internal use only; no backwards-compatibility guarantees."""
 
   def __init__(self, *args, **kwargs):
     self._executor = _ExecutorServiceParallelExecutor(*args, **kwargs)
@@ -334,17 +337,17 @@ class _ExecutorServiceParallelExecutor(object):
 
   def __init__(self, value_to_consumers, transform_evaluator_registry,
                evaluation_context):
-    self.executor_service = ExecutorService(
+    self.executor_service = _ExecutorService(
         _ExecutorServiceParallelExecutor.NUM_WORKERS)
-    self.transform_executor_services = TransformExecutorServices(
+    self.transform_executor_services = _TransformExecutorServices(
         self.executor_service)
     self.value_to_consumers = value_to_consumers
     self.transform_evaluator_registry = transform_evaluator_registry
     self.evaluation_context = evaluation_context
     self.all_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue(
-        _ExecutorServiceParallelExecutor.ExecutorUpdate)
+        _ExecutorServiceParallelExecutor._ExecutorUpdate)
     self.visible_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue(
-        _ExecutorServiceParallelExecutor.VisibleExecutorUpdate)
+        _ExecutorServiceParallelExecutor._VisibleExecutorUpdate)
     self.default_completion_callback = _CompletionCallback(
         evaluation_context, self.all_updates)
 
@@ -412,7 +415,7 @@ class _ExecutorServiceParallelExecutor(object):
       assert isinstance(item, self._item_type)
       self._queue.put_nowait(item)
 
-  class ExecutorUpdate(object):
+  class _ExecutorUpdate(object):
     """An internal status update on the state of the executor."""
 
     def __init__(self, produced_bundle=None, exception=None):
@@ -425,7 +428,7 @@ class _ExecutorServiceParallelExecutor(object):
         # Not the right exception.
         self.exc_info = (exception, None, None)
 
-  class VisibleExecutorUpdate(object):
+  class _VisibleExecutorUpdate(object):
     """An update of interest to the user.
 
     Used for awaiting the completion to decide whether to return normally or
@@ -437,7 +440,7 @@ class _ExecutorServiceParallelExecutor(object):
       self.exception = exc_info[1] or exc_info[0]
       self.exc_info = exc_info
 
-  class _MonitorTask(ExecutorService.CallableTask):
+  class _MonitorTask(_ExecutorService.CallableTask):
     """MonitorTask continuously runs to ensure that pipeline makes progress."""
 
     def __init__(self, executor):
@@ -458,7 +461,7 @@ class _ExecutorServiceParallelExecutor(object):
             logging.warning('A task failed with exception.\n %s',
                             update.exception)
             self._executor.visible_updates.offer(
-                _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
+                _ExecutorServiceParallelExecutor._VisibleExecutorUpdate(
                     update.exc_info))
           update = self._executor.all_updates.poll()
         self._executor.evaluation_context.schedule_pending_unblocked_tasks(
@@ -467,7 +470,7 @@ class _ExecutorServiceParallelExecutor(object):
       except Exception as e:  # pylint: disable=broad-except
         logging.error('Monitor task died due to exception.\n %s', e)
         self._executor.visible_updates.offer(
-            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
+            _ExecutorServiceParallelExecutor._VisibleExecutorUpdate(
                 sys.exc_info()))
       finally:
         if not self._should_shutdown():
@@ -497,11 +500,11 @@ class _ExecutorServiceParallelExecutor(object):
       else:
         if self._executor.evaluation_context.is_done():
           self._executor.visible_updates.offer(
-              _ExecutorServiceParallelExecutor.VisibleExecutorUpdate())
+              _ExecutorServiceParallelExecutor._VisibleExecutorUpdate())
         else:
           # Nothing is scheduled for execution, but watermarks incomplete.
           self._executor.visible_updates.offer(
-              _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
+              _ExecutorServiceParallelExecutor._VisibleExecutorUpdate(
                   (Exception('Monitor task detected a pipeline stall.'),
                    None,
                    None)))

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index f6cdd41..6984ded 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -41,7 +41,9 @@ from apache_beam.options.pipeline_options import TypeOptions
 
 
 class TransformEvaluatorRegistry(object):
-  """Creates instances of TransformEvaluator for the application of a transform.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Creates instances of TransformEvaluator for the application of a transform.
   """
 
   def __init__(self, evaluation_context):
@@ -253,7 +255,7 @@ class _TaggedReceivers(dict):
     def output(self, element):
       pass
 
-  class InMemoryReceiver(object):
+  class _InMemoryReceiver(object):
     """Buffers undeclared outputs to the given dictionary."""
 
     def __init__(self, target, tag):
@@ -267,7 +269,7 @@ class _TaggedReceivers(dict):
     if self._evaluation_context.has_cache:
       if not self._undeclared_in_memory_tag_values:
         self._undeclared_in_memory_tag_values = collections.defaultdict(list)
-      receiver = _TaggedReceivers.InMemoryReceiver(
+      receiver = _TaggedReceivers._InMemoryReceiver(
           self._undeclared_in_memory_tag_values, key)
     else:
       if not self._null_receiver:

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/direct/transform_result.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py b/sdks/python/apache_beam/runners/direct/transform_result.py
index 8ae0aea..febdd20 100644
--- a/sdks/python/apache_beam/runners/direct/transform_result.py
+++ b/sdks/python/apache_beam/runners/direct/transform_result.py
@@ -21,7 +21,9 @@ from __future__ import absolute_import
 
 
 class TransformResult(object):
-  """The result of evaluating an AppliedPTransform with a TransformEvaluator."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  The result of evaluating an AppliedPTransform with a TransformEvaluator."""
 
   def __init__(self, applied_ptransform, uncommitted_output_bundles, state,
                timer_update, counters, watermark_hold,

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 7793478..3a13539 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -28,7 +28,9 @@ from apache_beam.utils.timestamp import MIN_TIMESTAMP
 
 
 class WatermarkManager(object):
-  """Tracks and updates watermarks for all AppliedPTransforms."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Tracks and updates watermarks for all AppliedPTransforms."""
 
   WATERMARK_POS_INF = MAX_TIMESTAMP
   WATERMARK_NEG_INF = MIN_TIMESTAMP
@@ -41,12 +43,12 @@ class WatermarkManager(object):
     self._transform_to_watermarks = {}
 
     for root_transform in root_transforms:
-      self._transform_to_watermarks[root_transform] = TransformWatermarks(
+      self._transform_to_watermarks[root_transform] = _TransformWatermarks(
           self._clock)
 
     for consumers in value_to_consumers.values():
       for consumer in consumers:
-        self._transform_to_watermarks[consumer] = TransformWatermarks(
+        self._transform_to_watermarks[consumer] = _TransformWatermarks(
             self._clock)
 
     for consumers in value_to_consumers.values():
@@ -139,7 +141,7 @@ class WatermarkManager(object):
     return all_timers
 
 
-class TransformWatermarks(object):
+class _TransformWatermarks(object):
   """Tracks input and output watermarks for aan AppliedPTransform."""
 
   def __init__(self, clock):

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index 4f82774..d3d3c24 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -60,7 +60,9 @@ class _PipelineContextMap(object):
 
 
 class PipelineContext(object):
-  """Used for accessing and constructing the referenced objects of a Pipeline.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Used for accessing and constructing the referenced objects of a Pipeline.
   """
 
   _COMPONENT_TYPES = {

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 5802c17..2635559 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -39,6 +39,8 @@ from apache_beam.runners.worker import data_plane
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import sdk_worker
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 def streaming_rpc_handler(cls, method_name):
   """Un-inverts the flow of control between the runner and the sdk harness."""

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index 3e08780..077871e 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -44,6 +44,8 @@ from apache_beam.typehints import typehints
 from apache_beam.utils import profiler
 from apache_beam.utils.counters import CounterFactory
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 class MapTaskExecutorRunner(PipelineRunner):
   """Beam runner translating a pipeline into map tasks that are then executed.

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index b35cb7b..d875fdc 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -56,7 +56,9 @@ _ALL_KNOWN_RUNNERS = (
 
 
 def create_runner(runner_name):
-  """Creates a runner instance from a runner class name.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Creates a runner instance from a runner class name.
 
   Args:
     runner_name: Name of the pipeline runner. Possible values are:
@@ -170,7 +172,9 @@ class PipelineRunner(object):
 
 
 class PValueCache(object):
-  """Local cache for arbitrary information computed for PValue objects."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Local cache for arbitrary information computed for PValue objects."""
 
   def __init__(self, use_disk_backed_cache=False):
     # Cache of values computed while a runner executes a pipeline. This is a

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/worker/data_plane.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py
index 6425447..5edd0b4 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -31,6 +31,8 @@ from apache_beam.coders import coder_impl
 from apache_beam.runners.api import beam_fn_api_pb2
 import grpc
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 class ClosableOutputStream(type(coder_impl.create_OutputStream())):
   """A Outputstream for use with CoderImpls that has a close() method."""

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/worker/log_handler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py b/sdks/python/apache_beam/runners/worker/log_handler.py
index b9e36ad..59ffbf4 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -24,6 +24,8 @@ import threading
 from apache_beam.runners.api import beam_fn_api_pb2
 import grpc
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 class FnApiLogRecordHandler(logging.Handler):
   """A handler that writes log records to the fn API."""

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/worker/logger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/logger.py b/sdks/python/apache_beam/runners/worker/logger.py
index 217dc58..0433538 100644
--- a/sdks/python/apache_beam/runners/worker/logger.py
+++ b/sdks/python/apache_beam/runners/worker/logger.py
@@ -24,6 +24,8 @@ import traceback
 
 from apache_beam.runners.common import LoggingContext
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 # Per-thread worker information. This is used only for logging to set
 # context information that changes while work items get executed:

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/worker/opcounters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py
index 56ce0db..2bb15fa 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -25,6 +25,8 @@ import random
 
 from apache_beam.utils.counters import Counter
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 class SumAccumulator(object):
   """Accumulator for collecting byte counts."""

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/worker/operation_specs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py b/sdks/python/apache_beam/runners/worker/operation_specs.py
index 977e165..c03d9a2 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -25,6 +25,8 @@ import collections
 
 from apache_beam import coders
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 def build_worker_instruction(*args):
   """Create an object representing a ParallelInstruction protobuf.

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 6907f6e..596bb90 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -40,6 +40,10 @@ from apache_beam.utils import counters
 from apache_beam.runners.api import beam_fn_api_pb2
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import operations
+
+# This module is experimental. No backwards-compatibility guarantees.
+
+
 try:
   from apache_beam.runners.worker import statesampler
 except ImportError:

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 28828c3..b891779 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -28,6 +28,8 @@ from apache_beam.runners.api import beam_fn_api_pb2
 from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler
 from apache_beam.runners.worker.sdk_worker import SdkHarness
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 def main(unused_argv):
   """Main entry point for SDK Fn Harness."""

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/worker/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index 3bac3d9..bdf9f4e 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -26,6 +26,8 @@ import traceback
 from apache_beam.io import iobase
 from apache_beam.transforms import window
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 # Maximum number of reader threads for reading side input sources, per side
 # input.

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/runners/worker/statesampler_fake.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
index efd7f2d..88ace8c 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+# This module is experimental. No backwards-compatibility guarantees.
+
 
 class StateSampler(object):
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index e37a387..d42115c 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -227,7 +227,9 @@ def _fn_takes_side_inputs(fn):
 
 
 class CallableWrapperDoFn(DoFn):
-  """A DoFn (function) object wrapping a callable object.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A DoFn (function) object wrapping a callable object.
 
   The purpose of this class is to conveniently wrap simple functions and use
   them in transforms.
@@ -410,7 +412,9 @@ class CombineFn(WithTypeHints, HasDisplayData):
 
 
 class CallableWrapperCombineFn(CombineFn):
-  """A CombineFn (function) object wrapping a callable object.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A CombineFn (function) object wrapping a callable object.
 
   The purpose of this class is to conveniently wrap simple functions and use
   them in Combine transforms.
@@ -537,7 +541,9 @@ class PartitionFn(WithTypeHints):
 
 
 class CallableWrapperPartitionFn(PartitionFn):
-  """A PartitionFn object wrapping a callable object.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A PartitionFn object wrapping a callable object.
 
   Instances of this class wrap simple functions for use in Partition operations.
   """

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 526291c..fb79b19 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -125,7 +125,7 @@ class GetPValues(_PValueishTransform):
       super(GetPValues, self).visit(node, pvalues)
 
 
-class ZipPValues(_PValueishTransform):
+class _ZipPValues(_PValueishTransform):
   """Pairs each PValue in a pvalueish with a value in a parallel out sibling.
 
   Sibling should have the same nested structure as pvalueish.  Leaves in
@@ -148,7 +148,7 @@ class ZipPValues(_PValueishTransform):
     elif isinstance(pvalueish, (pvalue.PValue, pvalue.DoOutputsTuple)):
       pairs.append((context, pvalueish, sibling))
     else:
-      super(ZipPValues, self).visit(pvalueish, sibling, pairs, context)
+      super(_ZipPValues, self).visit(pvalueish, sibling, pairs, context)
 
   def visit_list(self, pvalueish, sibling, pairs, context):
     if isinstance(sibling, (list, tuple)):
@@ -264,7 +264,7 @@ class PTransform(WithTypeHints, HasDisplayData):
               self.__class__, input_or_output))
     root_hint = (
         arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
-    for context, pvalue_, hint in ZipPValues().visit(pvalueish, root_hint):
+    for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
       if pvalue_.element_type is None:
         # TODO(robertwb): It's a bug that we ever get here. (typecheck)
         continue
@@ -341,7 +341,7 @@ class PTransform(WithTypeHints, HasDisplayData):
   def __or__(self, right):
     """Used to compose PTransforms, e.g., ptransform1 | ptransform2."""
     if isinstance(right, PTransform):
-      return ChainedPTransform(self, right)
+      return _ChainedPTransform(self, right)
     return NotImplemented
 
   def __ror__(self, left, label=None):
@@ -453,10 +453,10 @@ PTransform.register_urn(
     PTransform.from_runner_api_parameter)
 
 
-class ChainedPTransform(PTransform):
+class _ChainedPTransform(PTransform):
 
   def __init__(self, *parts):
-    super(ChainedPTransform, self).__init__(label=self._chain_label(parts))
+    super(_ChainedPTransform, self).__init__(label=self._chain_label(parts))
     self._parts = parts
 
   def _chain_label(self, parts):
@@ -466,7 +466,7 @@ class ChainedPTransform(PTransform):
     if isinstance(right, PTransform):
       # Create a flat list rather than a nested tree of composite
       # transforms for better monitoring, etc.
-      return ChainedPTransform(*(self._parts + (right,)))
+      return _ChainedPTransform(*(self._parts + (right,)))
     return NotImplemented
 
   def expand(self, pval):

http://git-wip-us.apache.org/repos/asf/beam/blob/cc3d07aa/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 97240d3..2cb7ce3 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -47,7 +47,7 @@ class AccumulationMode(object):
   # RETRACTING = 3
 
 
-class StateTag(object):
+class _StateTag(object):
   """An identifier used to store and retrieve typed, combinable state.
 
   The given tag must be unique for this stage.  If CombineFn is None then
@@ -60,22 +60,22 @@ class StateTag(object):
     self.tag = tag
 
 
-class ValueStateTag(StateTag):
+class _ValueStateTag(_StateTag):
   """StateTag pointing to an element."""
 
   def __repr__(self):
     return 'ValueStateTag(%s)' % (self.tag)
 
   def with_prefix(self, prefix):
-    return ValueStateTag(prefix + self.tag)
+    return _ValueStateTag(prefix + self.tag)
 
 
-class CombiningValueStateTag(StateTag):
+class _CombiningValueStateTag(_StateTag):
   """StateTag pointing to an element, accumulated with a combiner."""
 
   # TODO(robertwb): Also store the coder (perhaps extracted from the combine_fn)
   def __init__(self, tag, combine_fn):
-    super(CombiningValueStateTag, self).__init__(tag)
+    super(_CombiningValueStateTag, self).__init__(tag)
     if not combine_fn:
       raise ValueError('combine_fn must be specified.')
     if not isinstance(combine_fn, core.CombineFn):
@@ -86,22 +86,22 @@ class CombiningValueStateTag(StateTag):
     return 'CombiningValueStateTag(%s, %s)' % (self.tag, self.combine_fn)
 
   def with_prefix(self, prefix):
-    return CombiningValueStateTag(prefix + self.tag, self.combine_fn)
+    return _CombiningValueStateTag(prefix + self.tag, self.combine_fn)
 
 
-class ListStateTag(StateTag):
+class _ListStateTag(_StateTag):
   """StateTag pointing to a list of elements."""
   def __repr__(self):
     return 'ListStateTag(%s)' % self.tag
 
   def with_prefix(self, prefix):
-    return ListStateTag(prefix + self.tag)
+    return _ListStateTag(prefix + self.tag)
 
 
-class WatermarkHoldStateTag(StateTag):
+class _WatermarkHoldStateTag(_StateTag):
 
   def __init__(self, tag, timestamp_combiner_impl):
-    super(WatermarkHoldStateTag, self).__init__(tag)
+    super(_WatermarkHoldStateTag, self).__init__(tag)
     self.timestamp_combiner_impl = timestamp_combiner_impl
 
   def __repr__(self):
@@ -109,8 +109,8 @@ class WatermarkHoldStateTag(StateTag):
                                               self.timestamp_combiner_impl)
 
   def with_prefix(self, prefix):
-    return WatermarkHoldStateTag(prefix + self.tag,
-                                 self.timestamp_combiner_impl)
+    return _WatermarkHoldStateTag(prefix + self.tag,
+                                  self.timestamp_combiner_impl)
 
 
 # pylint: disable=unused-argument
@@ -251,7 +251,7 @@ class AfterWatermark(TriggerFn):
       late: if not None, a speculative trigger to repeatedly evaluate after
         the watermark passes the end of the window
   """
-  LATE_TAG = CombiningValueStateTag('is_late', any)
+  LATE_TAG = _CombiningValueStateTag('is_late', any)
 
   def __init__(self, early=None, late=None):
     self.early = Repeatedly(early) if early else None
@@ -355,7 +355,7 @@ class AfterWatermark(TriggerFn):
 class AfterCount(TriggerFn):
   """Fire when there are at least count elements in this window pane."""
 
-  COUNT_TAG = CombiningValueStateTag('count', combiners.CountCombineFn())
+  COUNT_TAG = _CombiningValueStateTag('count', combiners.CountCombineFn())
 
   def __init__(self, count):
     self.count = count
@@ -432,7 +432,7 @@ class Repeatedly(TriggerFn):
             subtrigger=self.underlying.to_runner_api(context)))
 
 
-class ParallelTriggerFn(TriggerFn):
+class _ParallelTriggerFn(TriggerFn):
 
   __metaclass__ = ABCMeta
 
@@ -506,7 +506,7 @@ class ParallelTriggerFn(TriggerFn):
       raise NotImplementedError(self)
 
 
-class AfterAny(ParallelTriggerFn):
+class AfterAny(_ParallelTriggerFn):
   """Fires when any subtrigger fires.
 
   Also finishes when any subtrigger finishes.
@@ -514,7 +514,7 @@ class AfterAny(ParallelTriggerFn):
   combine_op = any
 
 
-class AfterAll(ParallelTriggerFn):
+class AfterAll(_ParallelTriggerFn):
   """Fires when all subtriggers have fired.
 
   Also finishes when all subtriggers have finished.
@@ -524,7 +524,7 @@ class AfterAll(ParallelTriggerFn):
 
 class AfterEach(TriggerFn):
 
-  INDEX_TAG = CombiningValueStateTag('index', (
+  INDEX_TAG = _CombiningValueStateTag('index', (
       lambda indices: 0 if not indices else max(indices)))
 
   def __init__(self, *triggers):
@@ -711,7 +711,7 @@ class MergeableStateAdapter(SimpleState):
   # TODO(robertwb): A similar indirection could be used for sliding windows
   # or other window_fns when a single element typically belongs to many windows.
 
-  WINDOW_IDS = ValueStateTag('window_ids')
+  WINDOW_IDS = _ValueStateTag('window_ids')
 
   def __init__(self, raw_state):
     self.raw_state = raw_state
@@ -726,7 +726,7 @@ class MergeableStateAdapter(SimpleState):
       self.raw_state.clear_timer(window_id, name, time_domain)
 
   def add_state(self, window, tag, value):
-    if isinstance(tag, ValueStateTag):
+    if isinstance(tag, _ValueStateTag):
       raise ValueError(
           'Merging requested for non-mergeable state tag: %r.' % tag)
     self.raw_state.add_state(self._get_id(window), tag, value)
@@ -734,10 +734,10 @@ class MergeableStateAdapter(SimpleState):
   def get_state(self, window, tag):
     values = [self.raw_state.get_state(window_id, tag)
               for window_id in self._get_ids(window)]
-    if isinstance(tag, ValueStateTag):
+    if isinstance(tag, _ValueStateTag):
       raise ValueError(
           'Merging requested for non-mergeable state tag: %r.' % tag)
-    elif isinstance(tag, CombiningValueStateTag):
+    elif isinstance(tag, _CombiningValueStateTag):
       # TODO(robertwb): Strip combine_fn.extract_output from raw_state tag.
       if not values:
         accumulator = tag.combine_fn.create_accumulator()
@@ -747,9 +747,9 @@ class MergeableStateAdapter(SimpleState):
         accumulator = tag.combine_fn.merge_accumulators(values)
         # TODO(robertwb): Store the merged value in the first tag.
       return tag.combine_fn.extract_output(accumulator)
-    elif isinstance(tag, ListStateTag):
+    elif isinstance(tag, _ListStateTag):
       return [v for vs in values for v in vs]
-    elif isinstance(tag, WatermarkHoldStateTag):
+    elif isinstance(tag, _WatermarkHoldStateTag):
       return tag.timestamp_combiner_impl.combine_all(values)
     else:
       raise ValueError('Invalid tag.', tag)
@@ -904,15 +904,15 @@ class GeneralTriggerDriver(TriggerDriver):
 
   Suitable for all variants of Windowing.
   """
-  ELEMENTS = ListStateTag('elements')
-  TOMBSTONE = CombiningValueStateTag('tombstone', combiners.CountCombineFn())
+  ELEMENTS = _ListStateTag('elements')
+  TOMBSTONE = _CombiningValueStateTag('tombstone', combiners.CountCombineFn())
 
   def __init__(self, windowing):
     self.window_fn = windowing.windowfn
     self.timestamp_combiner_impl = TimestampCombiner.get_impl(
         windowing.timestamp_combiner, self.window_fn)
     # pylint: disable=invalid-name
-    self.WATERMARK_HOLD = WatermarkHoldStateTag(
+    self.WATERMARK_HOLD = _WatermarkHoldStateTag(
         'watermark', self.timestamp_combiner_impl)
     # pylint: enable=invalid-name
     self.trigger_fn = windowing.triggerfn
@@ -1035,7 +1035,7 @@ class InMemoryUnmergedState(UnmergedState):
     self.defensive_copy = defensive_copy
 
   def set_global_state(self, tag, value):
-    assert isinstance(tag, ValueStateTag)
+    assert isinstance(tag, _ValueStateTag)
     if self.defensive_copy:
       value = copy.deepcopy(value)
     self.global_state[tag.tag] = value
@@ -1055,26 +1055,26 @@ class InMemoryUnmergedState(UnmergedState):
   def add_state(self, window, tag, value):
     if self.defensive_copy:
       value = copy.deepcopy(value)
-    if isinstance(tag, ValueStateTag):
+    if isinstance(tag, _ValueStateTag):
       self.state[window][tag.tag] = value
-    elif isinstance(tag, CombiningValueStateTag):
+    elif isinstance(tag, _CombiningValueStateTag):
       self.state[window][tag.tag].append(value)
-    elif isinstance(tag, ListStateTag):
+    elif isinstance(tag, _ListStateTag):
       self.state[window][tag.tag].append(value)
-    elif isinstance(tag, WatermarkHoldStateTag):
+    elif isinstance(tag, _WatermarkHoldStateTag):
       self.state[window][tag.tag].append(value)
     else:
       raise ValueError('Invalid tag.', tag)
 
   def get_state(self, window, tag):
     values = self.state[window][tag.tag]
-    if isinstance(tag, ValueStateTag):
+    if isinstance(tag, _ValueStateTag):
       return values
-    elif isinstance(tag, CombiningValueStateTag):
+    elif isinstance(tag, _CombiningValueStateTag):
       return tag.combine_fn.apply(values)
-    elif isinstance(tag, ListStateTag):
+    elif isinstance(tag, _ListStateTag):
       return values
-    elif isinstance(tag, WatermarkHoldStateTag):
+    elif isinstance(tag, _WatermarkHoldStateTag):
       return tag.timestamp_combiner_impl.combine_all(values)
     else:
       raise ValueError('Invalid tag.', tag)


[2/2] beam git commit: This closes #3036

Posted by ch...@apache.org.
This closes #3036


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fafaa84
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fafaa84
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fafaa84

Branch: refs/heads/master
Commit: 1fafaa8465f88b17dc565cbfd1eb94f2e20c52ff
Parents: b1a2dbe cc3d07a
Author: chamikara@google.com <ch...@google.com>
Authored: Wed May 10 01:44:56 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed May 10 01:44:56 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py    | 57 ++++++++++++----
 sdks/python/apache_beam/coders/observable.py    |  4 +-
 sdks/python/apache_beam/coders/slow_stream.py   | 16 +++--
 sdks/python/apache_beam/coders/typecoders.py    |  4 +-
 sdks/python/apache_beam/internal/gcp/auth.py    | 18 +++--
 .../apache_beam/internal/gcp/json_value.py      | 12 +++-
 sdks/python/apache_beam/internal/pickler.py     | 26 ++++---
 sdks/python/apache_beam/internal/util.py        | 16 +++--
 sdks/python/apache_beam/io/concat_source.py     |  8 ++-
 sdks/python/apache_beam/metrics/cells.py        | 32 ++++++---
 sdks/python/apache_beam/metrics/execution.py    |  7 +-
 .../apache_beam/runners/api/beam_fn_api_pb2.py  |  2 +
 .../runners/api/beam_fn_api_pb2_grpc.py         |  2 +
 .../runners/api/beam_runner_api_pb2.py          |  2 +
 sdks/python/apache_beam/runners/common.pxd      |  4 +-
 sdks/python/apache_beam/runners/common.py       | 28 +++++---
 .../runners/dataflow/internal/apiclient.py      |  4 +-
 .../runners/dataflow/internal/dependency.py     | 16 +++--
 .../runners/dataflow/internal/names.py          |  8 ++-
 .../runners/dataflow/native_io/iobase.py        |  2 +
 .../runners/direct/bundle_factory.py            | 16 +++--
 sdks/python/apache_beam/runners/direct/clock.py |  5 +-
 .../consumer_tracking_pipeline_visitor.py       |  4 +-
 .../apache_beam/runners/direct/executor.py      | 63 +++++++++--------
 .../runners/direct/transform_evaluator.py       |  8 ++-
 .../runners/direct/transform_result.py          |  4 +-
 .../runners/direct/watermark_manager.py         | 10 +--
 .../apache_beam/runners/pipeline_context.py     |  4 +-
 .../runners/portability/fn_api_runner.py        |  2 +
 .../portability/maptask_executor_runner.py      |  2 +
 sdks/python/apache_beam/runners/runner.py       |  8 ++-
 .../apache_beam/runners/worker/data_plane.py    |  2 +
 .../apache_beam/runners/worker/log_handler.py   |  2 +
 .../python/apache_beam/runners/worker/logger.py |  2 +
 .../apache_beam/runners/worker/opcounters.py    |  2 +
 .../runners/worker/operation_specs.py           |  2 +
 .../apache_beam/runners/worker/sdk_worker.py    |  4 ++
 .../runners/worker/sdk_worker_main.py           |  2 +
 .../apache_beam/runners/worker/sideinputs.py    |  2 +
 .../runners/worker/statesampler_fake.py         |  2 +
 sdks/python/apache_beam/transforms/core.py      | 12 +++-
 .../python/apache_beam/transforms/ptransform.py | 14 ++--
 sdks/python/apache_beam/transforms/trigger.py   | 72 ++++++++++----------
 43 files changed, 346 insertions(+), 166 deletions(-)
----------------------------------------------------------------------