You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/12 00:11:55 UTC

[01/19] beam git commit: [BEAM-1345] Clearly delineate public api in apache_beam/typehints.

Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 65aa0ffd3 -> 1cc32c65e


[BEAM-1345] Clearly delineate public api in apache_beam/typehints.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/36fcd36c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/36fcd36c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/36fcd36c

Branch: refs/heads/release-2.0.0
Commit: 36fcd36cb779592bc3964d4949a70df5a1cc1428
Parents: a6543ab
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 11 12:54:13 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py              |  2 +-
 sdks/python/apache_beam/transforms/core.py       |  8 ++++----
 sdks/python/apache_beam/transforms/ptransform.py | 14 +++++++-------
 sdks/python/apache_beam/typehints/decorators.py  | 17 +++++++++++++----
 sdks/python/apache_beam/typehints/opcodes.py     |  2 ++
 .../apache_beam/typehints/trivial_inference.py   |  2 ++
 sdks/python/apache_beam/typehints/typecheck.py   | 15 +++++++++------
 sdks/python/apache_beam/typehints/typehints.py   | 18 ++++++++++++++++++
 .../apache_beam/typehints/typehints_test.py      | 19 ++++++++++---------
 9 files changed, 66 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index ec8dde4..79480d7 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -53,11 +53,11 @@ import shutil
 import tempfile
 
 from apache_beam import pvalue
-from apache_beam import typehints
 from apache_beam.internal import pickler
 from apache_beam.runners import create_runner
 from apache_beam.runners import PipelineRunner
 from apache_beam.transforms import ptransform
+from apache_beam.typehints import typehints
 from apache_beam.typehints import TypeCheckError
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions

http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index a1964cf..abe699f 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -40,15 +40,15 @@ from apache_beam.transforms.window import TimestampedValue
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import WindowFn
 from apache_beam.typehints import Any
-from apache_beam.typehints import get_type_hints
-from apache_beam.typehints import is_consistent_with
 from apache_beam.typehints import Iterable
 from apache_beam.typehints import KV
 from apache_beam.typehints import trivial_inference
-from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints import Union
-from apache_beam.typehints import WithTypeHints
+from apache_beam.typehints.decorators import get_type_hints
+from apache_beam.typehints.decorators import TypeCheckError
+from apache_beam.typehints.decorators import WithTypeHints
 from apache_beam.typehints.trivial_inference import element_type
+from apache_beam.typehints.typehints import is_consistent_with
 from apache_beam.utils import urns
 from apache_beam.options.pipeline_options import TypeOptions
 

http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index d1f9835..8898c36 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -46,16 +46,16 @@ from google.protobuf import wrappers_pb2
 
 from apache_beam import error
 from apache_beam import pvalue
-from apache_beam import typehints
 from apache_beam.internal import pickler
 from apache_beam.internal import util
 from apache_beam.transforms.display import HasDisplayData
 from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.typehints import getcallargs_forhints
-from apache_beam.typehints import TypeCheckError
-from apache_beam.typehints import validate_composite_type_param
-from apache_beam.typehints import WithTypeHints
+from apache_beam.typehints import typehints
+from apache_beam.typehints.decorators import getcallargs_forhints
+from apache_beam.typehints.decorators import TypeCheckError
+from apache_beam.typehints.decorators import WithTypeHints
 from apache_beam.typehints.trivial_inference import instance_to_type
+from apache_beam.typehints.typehints import validate_composite_type_param
 from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
 
@@ -491,7 +491,7 @@ class PTransformWithSideInputs(PTransform):
   """
 
   def __init__(self, fn, *args, **kwargs):
-    if isinstance(fn, type) and issubclass(fn, typehints.WithTypeHints):
+    if isinstance(fn, type) and issubclass(fn, WithTypeHints):
       # Don't treat Fn class objects as callables.
       raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__))
     self.fn = self.make_fn(fn)
@@ -577,7 +577,7 @@ class PTransformWithSideInputs(PTransform):
           continue
         if not typehints.is_consistent_with(
             bindings.get(arg, typehints.Any), hint):
-          raise typehints.TypeCheckError(
+          raise TypeCheckError(
               'Type hint violation for \'%s\': requires %s but got %s for %s'
               % (self.label, hint, bindings[arg], arg))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/typehints/decorators.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py
index 4eabdba..6ed388a 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -86,11 +86,20 @@ defined, or before importing a module containing type-hinted functions.
 import inspect
 import types
 
-from apache_beam.typehints import check_constraint
-from apache_beam.typehints import CompositeTypeHintError
-from apache_beam.typehints import SimpleTypeHintError
 from apache_beam.typehints import typehints
-from apache_beam.typehints import validate_composite_type_param
+from apache_beam.typehints.typehints import check_constraint
+from apache_beam.typehints.typehints import CompositeTypeHintError
+from apache_beam.typehints.typehints import SimpleTypeHintError
+from apache_beam.typehints.typehints import validate_composite_type_param
+
+
+__all__ = [
+    'with_input_types',
+    'with_output_types',
+    'WithTypeHints',
+    'TypeCheckError',
+]
+
 
 # This is missing in the builtin types module.  str.upper is arbitrary, any
 # method on a C-implemented type will do.

http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/typehints/opcodes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/opcodes.py b/sdks/python/apache_beam/typehints/opcodes.py
index 042acc0..83f444c 100644
--- a/sdks/python/apache_beam/typehints/opcodes.py
+++ b/sdks/python/apache_beam/typehints/opcodes.py
@@ -23,6 +23,8 @@ FrameState object, the second the integer opcode argument.
 
 Bytecodes with more complicated behavior (e.g. modifying the program counter)
 are handled inline rather than here.
+
+For internal use only; no backwards-compatibility guarantees.
 """
 import types
 

http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/typehints/trivial_inference.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py b/sdks/python/apache_beam/typehints/trivial_inference.py
index 4581aa1..977ea06 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference.py
@@ -16,6 +16,8 @@
 #
 
 """Trivial type inference for simple functions.
+
+For internal use only; no backwards-compatibility guarantees.
 """
 import __builtin__
 import collections

http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py
index 160d104..09b73f9 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -15,7 +15,10 @@
 # limitations under the License.
 #
 
-"""Runtime type checking support."""
+"""Runtime type checking support.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
 
 import collections
 import inspect
@@ -25,13 +28,13 @@ import types
 from apache_beam.pvalue import TaggedOutput
 from apache_beam.transforms.core import DoFn
 from apache_beam.transforms.window import WindowedValue
-from apache_beam.typehints import check_constraint
-from apache_beam.typehints import CompositeTypeHintError
-from apache_beam.typehints import GeneratorWrapper
-from apache_beam.typehints import SimpleTypeHintError
-from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints.decorators import _check_instance_type
 from apache_beam.typehints.decorators import getcallargs_forhints
+from apache_beam.typehints.decorators import GeneratorWrapper
+from apache_beam.typehints.decorators import TypeCheckError
+from apache_beam.typehints.typehints import check_constraint
+from apache_beam.typehints.typehints import CompositeTypeHintError
+from apache_beam.typehints.typehints import SimpleTypeHintError
 
 
 class AbstractDoFnWrapper(DoFn):

http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index 9b41adb..cc430be 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -68,6 +68,24 @@ import copy
 import types
 
 
+__all__ = [
+    'Any',
+    'Union',
+    'Optional',
+    'Tuple',
+    'Tuple',
+    'List',
+    'KV',
+    'Dict',
+    'Set',
+    'Iterable',
+    'Iterator',
+    'Generator',
+    'WindowedValue',
+    'TypeVariable',
+]
+
+
 # A set of the built-in Python types we don't support, guiding the users
 # to templated (upper-case) versions instead.
 DISALLOWED_PRIMITIVE_TYPES = (list, set, tuple, dict)

http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/typehints/typehints_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py
index f90b5e9..f1b92e0 100644
--- a/sdks/python/apache_beam/typehints/typehints_test.py
+++ b/sdks/python/apache_beam/typehints/typehints_test.py
@@ -21,9 +21,8 @@ import inspect
 import unittest
 
 
-import apache_beam.typehints as typehints
+import apache_beam.typehints.typehints as typehints
 from apache_beam.typehints import Any
-from apache_beam.typehints import is_consistent_with
 from apache_beam.typehints import Tuple
 from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints import Union
@@ -34,6 +33,8 @@ from apache_beam.typehints.decorators import _interleave_type_check
 from apache_beam.typehints.decorators import _positional_arg_hints
 from apache_beam.typehints.decorators import get_type_hints
 from apache_beam.typehints.decorators import getcallargs_forhints
+from apache_beam.typehints.decorators import GeneratorWrapper
+from apache_beam.typehints.typehints import is_consistent_with
 
 
 def check_or_interleave(hint, value, var):
@@ -712,7 +713,7 @@ class TestGeneratorWrapper(TypeHintTestCase):
 
     l = []
     interleave_func = lambda x: l.append(x)
-    wrapped_gen = typehints.GeneratorWrapper(count(4), interleave_func)
+    wrapped_gen = GeneratorWrapper(count(4), interleave_func)
 
     # Should function as a normal generator.
     self.assertEqual(0, next(wrapped_gen))
@@ -1032,12 +1033,12 @@ class CombinedReturnsAndTakesTestCase(TypeHintTestCase):
 class DecoratorHelpers(TypeHintTestCase):
 
   def test_hint_helper(self):
-    self.assertTrue(typehints.is_consistent_with(Any, int))
-    self.assertTrue(typehints.is_consistent_with(int, Any))
-    self.assertTrue(typehints.is_consistent_with(str, object))
-    self.assertFalse(typehints.is_consistent_with(object, str))
-    self.assertTrue(typehints.is_consistent_with(str, Union[str, int]))
-    self.assertFalse(typehints.is_consistent_with(Union[str, int], str))
+    self.assertTrue(is_consistent_with(Any, int))
+    self.assertTrue(is_consistent_with(int, Any))
+    self.assertTrue(is_consistent_with(str, object))
+    self.assertFalse(is_consistent_with(object, str))
+    self.assertTrue(is_consistent_with(str, Union[str, int]))
+    self.assertFalse(is_consistent_with(Union[str, int], str))
 
   def test_positional_arg_hints(self):
     self.assertEquals(typehints.Any, _positional_arg_hints('x', {}))


[12/19] beam git commit: Mark internal modules in python datastoreio

Posted by al...@apache.org.
Mark internal modules in python datastoreio


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a0cc2d6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a0cc2d6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a0cc2d6

Branch: refs/heads/release-2.0.0
Commit: 0a0cc2d6bd5c3c0e6ab9d1388d5d3c8dc5ca7760
Parents: 5b09511
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Thu May 11 11:33:24 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py | 7 +++++--
 sdks/python/apache_beam/io/gcp/datastore/v1/helper.py         | 5 ++++-
 2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0a0cc2d6/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py b/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
index bc4d07f..0caf6d6 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
@@ -15,7 +15,11 @@
 # limitations under the License.
 #
 
-"""Fake datastore used for unit testing."""
+"""Fake datastore used for unit testing.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
+
 import uuid
 
 # Protect against environments where datastore library is not available.
@@ -27,7 +31,6 @@ except ImportError:
   pass
 # pylint: enable=wrong-import-order, wrong-import-position
 
-
 def create_run_query(entities, batch_size):
   """A fake datastore run_query method that returns entities in batches.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0a0cc2d6/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index a61884f..9e2c053 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -15,7 +15,10 @@
 # limitations under the License.
 #
 
-"""Cloud Datastore helper functions."""
+"""Cloud Datastore helper functions.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
 import sys
 
 # Protect against environments where datastore library is not available.


[08/19] beam git commit: [BEAM-1340] Add __all__ tags to modules in package apache_beam/transforms

Posted by al...@apache.org.
[BEAM-1340] Add __all__ tags to modules in package apache_beam/transforms


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6d02da03
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6d02da03
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6d02da03

Branch: refs/heads/release-2.0.0
Commit: 6d02da03981f08037538e63e9246efef63a36ea0
Parents: 0c784f9
Author: Charles Chen <cc...@google.com>
Authored: Wed May 10 23:06:36 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700

----------------------------------------------------------------------
 .../examples/snippets/snippets_test.py          |  2 +-
 .../runners/dataflow/dataflow_runner.py         |  2 +-
 .../runners/dataflow/dataflow_runner_test.py    |  5 +++--
 sdks/python/apache_beam/transforms/core.py      | 20 ++++++++++++++++++++
 .../apache_beam/transforms/cy_combiners.py      |  5 ++++-
 .../python/apache_beam/transforms/ptransform.py |  9 +++++++++
 .../apache_beam/transforms/ptransform_test.py   | 11 ++++++-----
 .../python/apache_beam/transforms/sideinputs.py |  2 ++
 sdks/python/apache_beam/transforms/timeutil.py  |  5 +++++
 sdks/python/apache_beam/transforms/trigger.py   | 14 ++++++++++++++
 sdks/python/apache_beam/transforms/window.py    | 15 +++++++++++++++
 11 files changed, 80 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 211da24..37cd470 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -892,7 +892,7 @@ class CombineTest(unittest.TestCase):
         unix_timestamp = extract_timestamp_from_log_entry(element)
         # Wrap and emit the current entry and new timestamp in a
         # TimestampedValue.
-        yield beam.TimestampedValue(element, unix_timestamp)
+        yield beam.window.TimestampedValue(element, unix_timestamp)
 
     timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
     # [END setting_timestamp]

http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 3d8437c..da8de9d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -169,7 +169,7 @@ class DataflowRunner(PipelineRunner):
       def visit_transform(self, transform_node):
         # Imported here to avoid circular dependencies.
         # pylint: disable=wrong-import-order, wrong-import-position
-        from apache_beam import GroupByKey, GroupByKeyOnly
+        from apache_beam.transforms.core import GroupByKey, GroupByKeyOnly
         if isinstance(transform_node.transform, (GroupByKey, GroupByKeyOnly)):
           pcoll = transform_node.inputs[0]
           input_type = pcoll.element_type

http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index b61a683..ac9b028 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -37,6 +37,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeExceptio
 from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.transforms.core import GroupByKeyOnly
 from apache_beam.typehints import typehints
 
 # Protect against environments where apitools library is not available.
@@ -184,7 +185,7 @@ class DataflowRunnerTest(unittest.TestCase):
     pcoll1 = PCollection(p)
     pcoll2 = PCollection(p)
     pcoll3 = PCollection(p)
-    for transform in [beam.GroupByKeyOnly(), beam.GroupByKey()]:
+    for transform in [GroupByKeyOnly(), beam.GroupByKey()]:
       pcoll1.element_type = None
       pcoll2.element_type = typehints.Any
       pcoll3.element_type = typehints.KV[typehints.Any, typehints.Any]
@@ -198,7 +199,7 @@ class DataflowRunnerTest(unittest.TestCase):
     p = TestPipeline()
     pcoll1 = PCollection(p)
     pcoll2 = PCollection(p)
-    for transform in [beam.GroupByKeyOnly(), beam.GroupByKey()]:
+    for transform in [GroupByKeyOnly(), beam.GroupByKey()]:
       pcoll1.element_type = typehints.TupleSequenceConstraint
       pcoll2.element_type = typehints.Set
       err_msg = "Input to GroupByKey must be of Tuple or Any type"

http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index d42115c..a1964cf 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -53,6 +53,26 @@ from apache_beam.utils import urns
 from apache_beam.options.pipeline_options import TypeOptions
 
 
+__all__ = [
+    'DoFn',
+    'CombineFn',
+    'PartitionFn',
+    'ParDo',
+    'FlatMap',
+    'Map',
+    'Filter',
+    'CombineGlobally',
+    'CombinePerKey',
+    'CombineValues',
+    'GroupByKey',
+    'Partition',
+    'Windowing',
+    'WindowInto',
+    'Flatten',
+    'Create',
+    ]
+
+
 # Type variables
 T = typehints.TypeVariable('T')
 K = typehints.TypeVariable('K')

http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/cy_combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py b/sdks/python/apache_beam/transforms/cy_combiners.py
index f824870..84aee21 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners.py
+++ b/sdks/python/apache_beam/transforms/cy_combiners.py
@@ -15,7 +15,10 @@
 # limitations under the License.
 #
 
-"""A library of basic cythonized CombineFn subclasses."""
+"""A library of basic cythonized CombineFn subclasses.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
 
 from __future__ import absolute_import
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index fb79b19..d1f9835 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -60,6 +60,13 @@ from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
 
 
+__all__ = [
+    'PTransform',
+    'ptransform_fn',
+    'label_from_callable',
+    ]
+
+
 class _PValueishTransform(object):
   """Visitor for PValueish objects.
 
@@ -639,6 +646,8 @@ class CallablePTransform(PTransform):
 def ptransform_fn(fn):
   """A decorator for a function-based PTransform.
 
+  Experimental; no backwards-compatibility guarantees.
+
   Args:
     fn: A function implementing a custom PTransform.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 137992d..3320d79 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -34,6 +34,7 @@ from apache_beam.options.pipeline_options import TypeOptions
 import apache_beam.pvalue as pvalue
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms import window
+from apache_beam.transforms.core import GroupByKeyOnly
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.display import DisplayData, DisplayDataItem
 from apache_beam.transforms.ptransform import PTransform
@@ -579,7 +580,7 @@ class PTransformTest(unittest.TestCase):
     pipeline = TestPipeline()
     pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
     with self.assertRaises(typehints.TypeCheckError) as cm:
-      pcolls | 'D' >> beam.GroupByKeyOnly()
+      pcolls | 'D' >> GroupByKeyOnly()
       pipeline.run()
 
     expected_error_prefix = ('Input type hint violation at D: expected '
@@ -1087,7 +1088,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
          | ('Pair' >> beam.Map(lambda x: (x, ord(x)))
             .with_output_types(typehints.KV[str, str]))
-         | beam.GroupByKeyOnly())
+         | GroupByKeyOnly())
 
     # Output type should correctly be deduced.
     # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -1111,7 +1112,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create([1, 2, 3]).with_output_types(int)
-       | 'F' >> beam.GroupByKeyOnly())
+       | 'F' >> GroupByKeyOnly())
 
     self.assertEqual("Input type hint violation at F: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1154,7 +1155,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       (self.p
        | 'Nums' >> beam.Create(range(5)).with_output_types(int)
        | 'ModDup' >> beam.Map(lambda x: (x % 2, x))
-       | beam.GroupByKeyOnly())
+       | GroupByKeyOnly())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform '
@@ -1977,7 +1978,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_gbk_type_inference(self):
     self.assertEqual(
         typehints.Tuple[str, typehints.Iterable[int]],
-        beam.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
+        GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
 
   def test_pipeline_inference(self):
     created = self.p | beam.Create(['a', 'b', 'c'])

http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py
index 6ba5311..f10cb92 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -17,6 +17,8 @@
 
 """Internal side input transforms and implementations.
 
+For internal use only; no backwards-compatibility guarantees.
+
 Important: this module is an implementation detail and should not be used
 directly by pipeline writers. Instead, users should use the helper methods
 AsSingleton, AsIter, AsList and AsDict in apache_beam.pvalue.

http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/timeutil.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index ba4ef36..c0f9198 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -23,6 +23,11 @@ from abc import ABCMeta
 from abc import abstractmethod
 
 
+__all__ = [
+    'TimeDomain',
+    ]
+
+
 class TimeDomain(object):
   """Time domain for streaming timers."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 2cb7ce3..7de2f85 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -38,6 +38,20 @@ from apache_beam.utils.timestamp import MAX_TIMESTAMP
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 
 
+__all__ = [
+    'AccumulationMode',
+    'TriggerFn',
+    'DefaultTrigger',
+    'AfterWatermark',
+    'AfterCount',
+    'Repeatedly',
+    'AfterAny',
+    'AfterAll',
+    'AfterEach',
+    'OrFinally',
+    ]
+
+
 class AccumulationMode(object):
   """Controls what to do with data when a trigger fires multiple times.
   """

http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 6d0db3a..94187e0 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -65,6 +65,21 @@ from apache_beam.utils.timestamp import Timestamp
 from apache_beam.utils.windowed_value import WindowedValue
 
 
+__all__ = [
+    'TimestampCombiner',
+    'WindowFn',
+    'BoundedWindow',
+    'IntervalWindow',
+    'TimestampedValue',
+    'GlobalWindow',
+    'NonMergingWindowFn',
+    'GlobalWindows',
+    'FixedWindows',
+    'SlidingWindows',
+    'Sessions',
+    ]
+
+
 # TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their
 # behavior.
 class TimestampCombiner(object):


[14/19] beam git commit: [BEAM-1345] Mark apache_beam/internal as internal.

Posted by al...@apache.org.
[BEAM-1345] Mark apache_beam/internal as internal.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2bb95fd6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2bb95fd6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2bb95fd6

Branch: refs/heads/release-2.0.0
Commit: 2bb95fd68635ea1932f3a7d7cc59fcef1644deea
Parents: 36fcd36
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 11 12:09:28 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:37 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/__init__.py     | 2 ++
 sdks/python/apache_beam/internal/gcp/__init__.py | 2 ++
 2 files changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2bb95fd6/sdks/python/apache_beam/internal/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/__init__.py b/sdks/python/apache_beam/internal/__init__.py
index cce3aca..0bce5d6 100644
--- a/sdks/python/apache_beam/internal/__init__.py
+++ b/sdks/python/apache_beam/internal/__init__.py
@@ -14,3 +14,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+"""For internal use only; no backwards-compatibility guarantees."""

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb95fd6/sdks/python/apache_beam/internal/gcp/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/__init__.py b/sdks/python/apache_beam/internal/gcp/__init__.py
index cce3aca..0bce5d6 100644
--- a/sdks/python/apache_beam/internal/gcp/__init__.py
+++ b/sdks/python/apache_beam/internal/gcp/__init__.py
@@ -14,3 +14,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+"""For internal use only; no backwards-compatibility guarantees."""


[16/19] beam git commit: Fix due to GBKO name change.

Posted by al...@apache.org.
Fix due to GBKO name change.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6f5888e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6f5888e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6f5888e

Branch: refs/heads/release-2.0.0
Commit: a6f5888e42ef34eacfe6384c0de9df9371ae968c
Parents: e12cf0d
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 11 15:47:54 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:37 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/portability/maptask_executor_runner.py     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a6f5888e/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index 077871e..ddfc4cc 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -243,7 +243,7 @@ class MapTaskExecutorRunner(PipelineRunner):
         (label, write_sideinput_op))
     return output_buffer
 
-  def run_GroupByKeyOnly(self, transform_node):
+  def run__GroupByKeyOnly(self, transform_node):
     map_task_index, producer_index, output_index = self.outputs[
         transform_node.inputs[0]]
     grouped_element_coder = self._get_coder(transform_node.outputs[None],


[03/19] beam git commit: [BEAM-1345] Mark Pipeline as public.

Posted by al...@apache.org.
[BEAM-1345] Mark Pipeline as public.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ce25430
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ce25430
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ce25430

Branch: refs/heads/release-2.0.0
Commit: 0ce25430ffec49fb2d94271e4af6225cee20388c
Parents: aeeefc1
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 11 13:30:32 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0ce25430/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 83c7287..ec8dde4 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -67,6 +67,9 @@ from apache_beam.options.pipeline_options_validator import PipelineOptionsValida
 from apache_beam.utils.annotations import deprecated
 
 
+__all__ = ['Pipeline']
+
+
 class Pipeline(object):
   """A pipeline object that manages a DAG of PValues and their PTransforms.
 
@@ -182,6 +185,8 @@ class Pipeline(object):
   def visit(self, visitor):
     """Visits depth-first every node of a pipeline's DAG.
 
+    Runner-internal implementation detail; no backwards-compatibility guarantees
+
     Args:
       visitor: PipelineVisitor object whose callbacks will be called for each
         node visited. See PipelineVisitor comments.
@@ -333,6 +338,7 @@ class Pipeline(object):
     return Visitor.ok
 
   def to_runner_api(self):
+    """For internal use only; no backwards-compatibility guarantees."""
     from apache_beam.runners import pipeline_context
     from apache_beam.runners.api import beam_runner_api_pb2
     context = pipeline_context.PipelineContext()
@@ -346,6 +352,7 @@ class Pipeline(object):
 
   @staticmethod
   def from_runner_api(proto, runner, options):
+    """For internal use only; no backwards-compatibility guarantees."""
     p = Pipeline(runner=runner, options=options)
     from apache_beam.runners import pipeline_context
     context = pipeline_context.PipelineContext(proto.components)


[17/19] beam git commit: Move assert_that, equal_to, is_empty to apache_beam.testing.util

Posted by al...@apache.org.
Move assert_that, equal_to, is_empty to apache_beam.testing.util


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2070f118
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2070f118
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2070f118

Branch: refs/heads/release-2.0.0
Commit: 2070f1182d49e3b7b3e9ed8a35173cb165fa5bfb
Parents: d0da682
Author: Charles Chen <cc...@google.com>
Authored: Thu May 11 15:07:30 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:37 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/autocomplete_test.py      |   4 +-
 .../examples/complete/estimate_pi_test.py       |   4 +-
 .../complete/game/hourly_team_score_test.py     |   4 +-
 .../examples/complete/game/user_score_test.py   |   4 +-
 .../apache_beam/examples/complete/tfidf_test.py |   4 +-
 .../complete/top_wikipedia_sessions_test.py     |   4 +-
 .../cookbook/bigquery_side_input_test.py        |   4 +-
 .../cookbook/bigquery_tornadoes_test.py         |   6 +-
 .../examples/cookbook/coders_test.py            |   4 +-
 .../examples/cookbook/combiners_test.py         |   6 +-
 .../examples/cookbook/custom_ptransform_test.py |   4 +-
 .../examples/cookbook/filters_test.py           |  12 ++-
 .../examples/cookbook/mergecontacts.py          |  14 +--
 .../apache_beam/examples/snippets/snippets.py   |  17 +--
 .../examples/snippets/snippets_test.py          |  30 +++---
 .../apache_beam/examples/wordcount_debugging.py |   6 +-
 sdks/python/apache_beam/io/avroio_test.py       |   4 +-
 .../python/apache_beam/io/concat_source_test.py |   4 +-
 .../apache_beam/io/filebasedsource_test.py      |   4 +-
 sdks/python/apache_beam/io/sources_test.py      |   4 +-
 sdks/python/apache_beam/io/textio_test.py       |   5 +-
 sdks/python/apache_beam/io/tfrecordio_test.py   |  24 +++--
 sdks/python/apache_beam/pipeline_test.py        |   4 +-
 .../portability/maptask_executor_runner_test.py |   6 +-
 sdks/python/apache_beam/runners/runner_test.py  |   4 +-
 sdks/python/apache_beam/testing/util.py         | 107 +++++++++++++++++++
 sdks/python/apache_beam/testing/util_test.py    |  50 +++++++++
 .../apache_beam/transforms/combiners_test.py    |   2 +-
 .../apache_beam/transforms/create_test.py       |   3 +-
 .../apache_beam/transforms/ptransform_test.py   |   2 +-
 .../apache_beam/transforms/sideinputs_test.py   |   2 +-
 .../apache_beam/transforms/trigger_test.py      |   2 +-
 sdks/python/apache_beam/transforms/util.py      |  79 --------------
 sdks/python/apache_beam/transforms/util_test.py |  50 ---------
 .../apache_beam/transforms/window_test.py       |   2 +-
 .../transforms/write_ptransform_test.py         |   2 +-
 .../typehints/typed_pipeline_test.py            |   2 +-
 37 files changed, 271 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 438633a..378d222 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -22,8 +22,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.complete import autocomplete
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class AutocompleteTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index 12d8379..fd51309 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -22,8 +22,8 @@ import unittest
 
 from apache_beam.examples.complete import estimate_pi
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import BeamAssertException
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import BeamAssertException
 
 
 def in_between(lower, upper):

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
index bd0abca..9c30127 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
@@ -23,6 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.complete.game import hourly_team_score
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class HourlyTeamScoreTest(unittest.TestCase):
@@ -44,7 +46,7 @@ class HourlyTeamScoreTest(unittest.TestCase):
                     start_min='2015-11-16-15-20',
                     stop_min='2015-11-16-17-20',
                     window_duration=60))
-      beam.assert_that(result, beam.equal_to([
+      assert_that(result, equal_to([
           ('team1', 18), ('team2', 2), ('team3', 13)]))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/game/user_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
index 2db53bd..59903d9 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
@@ -23,6 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.complete.game import user_score
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class UserScoreTest(unittest.TestCase):
@@ -40,7 +42,7 @@ class UserScoreTest(unittest.TestCase):
     with TestPipeline() as p:
       result = (
           p | beam.Create(UserScoreTest.SAMPLE_DATA) | user_score.UserScore())
-      beam.assert_that(result, beam.equal_to([
+      assert_that(result, equal_to([
           ('user1_team1', 50), ('user2_team2', 2), ('user3_team3', 8),
           ('user4_team3', 5)]))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 0e30254..f177dfc 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -26,6 +26,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.complete import tfidf
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 EXPECTED_RESULTS = set([
@@ -57,7 +59,7 @@ class TfIdfTest(unittest.TestCase):
         uri_to_line
         | tfidf.TfIdf()
         | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
-    beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS))
+    assert_that(result, equal_to(EXPECTED_RESULTS))
     # Run the pipeline. Note that the assert_that above adds to the pipeline
     # a check that the result PCollection contains expected values. To actually
     # trigger the check the pipeline must be run.

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 4850c04..5fb6276 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -24,6 +24,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.complete import top_wikipedia_sessions
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class ComputeTopSessionsTest(unittest.TestCase):
@@ -54,7 +56,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
     edits = p | beam.Create(self.EDITS)
     result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
 
-    beam.assert_that(result, beam.equal_to(self.EXPECTED))
+    assert_that(result, equal_to(self.EXPECTED))
     p.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 1ca25c9..b11dc47 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -23,6 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.cookbook import bigquery_side_input
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class BigQuerySideInputTest(unittest.TestCase):
@@ -42,7 +44,7 @@ class BigQuerySideInputTest(unittest.TestCase):
                                                words_pcoll, ignore_corpus_pcoll,
                                                ignore_word_pcoll)
 
-    beam.assert_that(groups, beam.equal_to(
+    assert_that(groups, equal_to(
         [('A', 'corpus2', 'word2'),
          ('B', 'corpus2', 'word2'),
          ('C', 'corpus2', 'word2')]))

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index ca7ca9e..c926df8 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -23,6 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.cookbook import bigquery_tornadoes
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class BigQueryTornadoesTest(unittest.TestCase):
@@ -35,8 +37,8 @@ class BigQueryTornadoesTest(unittest.TestCase):
         {'month': 1, 'day': 3, 'tornado': True},
         {'month': 2, 'day': 1, 'tornado': True}]))
     results = bigquery_tornadoes.count_tornadoes(rows)
-    beam.assert_that(results, beam.equal_to([{'month': 1, 'tornado_count': 2},
-                                             {'month': 2, 'tornado_count': 1}]))
+    assert_that(results, equal_to([{'month': 1, 'tornado_count': 2},
+                                   {'month': 2, 'tornado_count': 1}]))
     p.run().wait_until_finish()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index 35cf252..f71dad8 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -23,8 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.cookbook import coders
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class CodersTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
index 45c779f..ee1fb77 100644
--- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
@@ -28,6 +28,8 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class CombinersTest(unittest.TestCase):
@@ -49,7 +51,7 @@ class CombinersTest(unittest.TestCase):
         | beam.Create(CombinersTest.SAMPLE_DATA)
         | beam.CombinePerKey(sum))
 
-    beam.assert_that(result, beam.equal_to([('a', 6), ('b', 30), ('c', 100)]))
+    assert_that(result, equal_to([('a', 6), ('b', 30), ('c', 100)]))
     result.pipeline.run()
 
   def test_combine_per_key_with_custom_callable(self):
@@ -65,7 +67,7 @@ class CombinersTest(unittest.TestCase):
         | beam.Create(CombinersTest.SAMPLE_DATA)
         | beam.CombinePerKey(multiply))
 
-    beam.assert_that(result, beam.equal_to([('a', 6), ('b', 200), ('c', 100)]))
+    assert_that(result, equal_to([('a', 6), ('b', 200), ('c', 100)]))
     result.pipeline.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 2d35d8d..c7c6dba 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -23,8 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.cookbook import custom_ptransform
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class CustomCountTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index 44a352f..fd49f93 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -23,6 +23,8 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.cookbook import filters
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class FiltersTest(unittest.TestCase):
@@ -45,22 +47,22 @@ class FiltersTest(unittest.TestCase):
   def test_basic(self):
     """Test that the correct result is returned for a simple dataset."""
     results = self._get_result_for_month(1)
-    beam.assert_that(
+    assert_that(
         results,
-        beam.equal_to([{'year': 2010, 'month': 1, 'day': 1, 'mean_temp': 3},
-                       {'year': 2012, 'month': 1, 'day': 2, 'mean_temp': 3}]))
+        equal_to([{'year': 2010, 'month': 1, 'day': 1, 'mean_temp': 3},
+                  {'year': 2012, 'month': 1, 'day': 2, 'mean_temp': 3}]))
     results.pipeline.run()
 
   def test_basic_empty(self):
     """Test that the correct empty result is returned for a simple dataset."""
     results = self._get_result_for_month(3)
-    beam.assert_that(results, beam.equal_to([]))
+    assert_that(results, equal_to([]))
     results.pipeline.run()
 
   def test_basic_empty_missing(self):
     """Test that the correct empty result is returned for a missing month."""
     results = self._get_result_for_month(4)
-    beam.assert_that(results, beam.equal_to([]))
+    assert_that(results, equal_to([]))
     results.pipeline.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 5aaba10..4f53c61 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -40,6 +40,8 @@ from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 def run(argv=None, assert_results=None):
@@ -118,12 +120,12 @@ def run(argv=None, assert_results=None):
   # TODO(silviuc): Move the assert_results logic to the unit test.
   if assert_results is not None:
     expected_luddites, expected_writers, expected_nomads = assert_results
-    beam.assert_that(num_luddites, beam.equal_to([expected_luddites]),
-                     label='assert:luddites')
-    beam.assert_that(num_writers, beam.equal_to([expected_writers]),
-                     label='assert:writers')
-    beam.assert_that(num_nomads, beam.equal_to([expected_nomads]),
-                     label='assert:nomads')
+    assert_that(num_luddites, equal_to([expected_luddites]),
+                label='assert:luddites')
+    assert_that(num_writers, equal_to([expected_writers]),
+                label='assert:writers')
+    assert_that(num_nomads, equal_to([expected_nomads]),
+                label='assert:nomads')
   # Execute pipeline.
   return p.run()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 1bdb9a3..7259572 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -33,6 +33,8 @@ string. The tags can contain only letters, digits and _.
 import apache_beam as beam
 from apache_beam.metrics import Metrics
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 # Quiet some pylint warnings that happen because of the somewhat special
 # format for the code snippets.
@@ -566,8 +568,9 @@ def examples_wordcount_debugging(renames):
       | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
 
   # [START example_wordcount_debugging_assert]
-  beam.assert_that(
-      filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)]))
+  beam.testing.util.assert_that(
+      filtered_words, beam.testing.util.equal_to(
+          [('Flourish', 3), ('stomach', 1)]))
   # [END example_wordcount_debugging_assert]
 
   output = (filtered_words
@@ -661,8 +664,8 @@ def model_custom_source(count):
   # [END model_custom_source_use_new_source]
 
   lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
-  beam.assert_that(
-      lines, beam.equal_to(
+  assert_that(
+      lines, equal_to(
           ['line ' + str(number) for number in range(0, count)]))
 
   p.run().wait_until_finish()
@@ -691,8 +694,8 @@ def model_custom_source(count):
   # [END model_custom_source_use_ptransform]
 
   lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
-  beam.assert_that(
-      lines, beam.equal_to(
+  assert_that(
+      lines, equal_to(
           ['line ' + str(number) for number in range(0, count)]))
 
   # Don't test runner api due to pickling errors.
@@ -872,7 +875,7 @@ def model_textio_compressed(renames, expected):
       compression_type=beam.io.filesystem.CompressionTypes.GZIP)
   # [END model_textio_write_compressed]
 
-  beam.assert_that(lines, beam.equal_to(expected))
+  assert_that(lines, equal_to(expected))
   p.visit(SnippetUtils.RenameFiles(renames))
   p.run().wait_until_finish()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 37cd470..f7b51a7 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -30,10 +30,10 @@ from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.coders.coders import ToStringCoder
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.examples.snippets import snippets
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 from apache_beam.utils.windowed_value import WindowedValue
 
 # pylint: disable=expression-not-assigned
@@ -158,11 +158,11 @@ class ParDoTest(unittest.TestCase):
                                                     avg_word_len))
     # [END model_pardo_side_input]
 
-    beam.assert_that(small_words, beam.equal_to(['a', 'bb', 'ccc']))
-    beam.assert_that(larger_than_average, beam.equal_to(['ccc', 'dddd']),
-                     label='larger_than_average')
-    beam.assert_that(small_but_nontrivial, beam.equal_to(['bb']),
-                     label='small_but_not_trivial')
+    assert_that(small_words, equal_to(['a', 'bb', 'ccc']))
+    assert_that(larger_than_average, equal_to(['ccc', 'dddd']),
+                label='larger_than_average')
+    assert_that(small_but_nontrivial, equal_to(['bb']),
+                label='small_but_not_trivial')
     p.run()
 
   def test_pardo_side_input_dofn(self):
@@ -816,7 +816,7 @@ class CombineTest(unittest.TestCase):
               | 'group' >> beam.GroupByKey()
               | 'combine' >> beam.CombineValues(sum))
     unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    beam.assert_that(unkeyed, beam.equal_to([110, 215, 120]))
+    assert_that(unkeyed, equal_to([110, 215, 120]))
     p.run()
 
   def test_setting_sliding_windows(self):
@@ -834,8 +834,8 @@ class CombineTest(unittest.TestCase):
               | 'group' >> beam.GroupByKey()
               | 'combine' >> beam.CombineValues(sum))
     unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    beam.assert_that(unkeyed,
-                     beam.equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
+    assert_that(unkeyed,
+                equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
     p.run()
 
   def test_setting_session_windows(self):
@@ -853,8 +853,8 @@ class CombineTest(unittest.TestCase):
               | 'group' >> beam.GroupByKey()
               | 'combine' >> beam.CombineValues(sum))
     unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    beam.assert_that(unkeyed,
-                     beam.equal_to([29, 27]))
+    assert_that(unkeyed,
+                equal_to([29, 27]))
     p.run()
 
   def test_setting_global_window(self):
@@ -872,7 +872,7 @@ class CombineTest(unittest.TestCase):
               | 'group' >> beam.GroupByKey()
               | 'combine' >> beam.CombineValues(sum))
     unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    beam.assert_that(unkeyed, beam.equal_to([56]))
+    assert_that(unkeyed, equal_to([56]))
     p.run()
 
   def test_setting_timestamp(self):
@@ -903,7 +903,7 @@ class CombineTest(unittest.TestCase):
               | 'group' >> beam.GroupByKey()
               | 'combine' >> beam.CombineValues(sum))
     unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    beam.assert_that(unkeyed, beam.equal_to([42, 187]))
+    assert_that(unkeyed, equal_to([42, 187]))
     p.run()
 
 
@@ -921,7 +921,7 @@ class PTransformTest(unittest.TestCase):
 
     p = TestPipeline()
     lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths()
-    beam.assert_that(lengths, beam.equal_to([1, 2, 3]))
+    assert_that(lengths, equal_to([1, 2, 3]))
     p.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 98acde4..ca9f7b6 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -51,6 +51,8 @@ from apache_beam.io import WriteToText
 from apache_beam.metrics import Metrics
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class FilterTextFn(beam.DoFn):
@@ -133,8 +135,8 @@ def run(argv=None):
   # of the Pipeline implies that the expectations were  met. Learn more at
   # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
   # test your pipeline.
-  beam.assert_that(
-      filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)]))
+  assert_that(
+      filtered_words, equal_to([('Flourish', 3), ('stomach', 1)]))
 
   # Format the counts into a PCollection of strings and write the output using a
   # "Write" transform that has side effects.

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 4a21839..6dcf121 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -27,10 +27,10 @@ from apache_beam.io import avroio
 from apache_beam.io import filebasedsource
 from apache_beam.io import source_test_utils
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
 
 # Importing following private class for testing purposes.
 from apache_beam.io.avroio import _AvroSource as AvroSource

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index a02f9ad..4a8f519 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -27,8 +27,8 @@ from apache_beam.io import range_trackers
 from apache_beam.io import source_test_utils
 from apache_beam.io.concat_source import ConcatSource
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class RangeSource(iobase.BoundedSource):

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index e17a004..afb340d 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -41,10 +41,10 @@ from apache_beam.io.filebasedsource import FileBasedSource
 from apache_beam.options.value_provider import StaticValueProvider
 from apache_beam.options.value_provider import RuntimeValueProvider
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
 
 
 class LineSource(FileBasedSource):

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index c0b8ad6..10d401b 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -28,8 +28,8 @@ from apache_beam import coders
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 class LineSource(iobase.BoundedSource):

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index d00afef..9a4ec47 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -44,9 +44,8 @@ from apache_beam.io.filebasedsource_test import write_pattern
 from apache_beam.io.filesystem import CompressionTypes
 
 from apache_beam.testing.test_pipeline import TestPipeline
-
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 
 
 # TODO: Refactor code so all io tests are using same library

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/tfrecordio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index b7e370d..3c70ade 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -36,6 +36,8 @@ from apache_beam.io.tfrecordio import _TFRecordUtil
 from apache_beam.io.tfrecordio import ReadFromTFRecord
 from apache_beam.io.tfrecordio import WriteToTFRecord
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 import crcmod
 
 
@@ -254,7 +256,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                         coder=coders.BytesCoder(),
                         compression_type=CompressionTypes.AUTO,
                         validate=True)))
-      beam.assert_that(result, beam.equal_to(['foo']))
+      assert_that(result, equal_to(['foo']))
 
   def test_process_multiple(self):
     path = os.path.join(self._new_tempdir(), 'result')
@@ -267,7 +269,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                         coder=coders.BytesCoder(),
                         compression_type=CompressionTypes.AUTO,
                         validate=True)))
-      beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+      assert_that(result, equal_to(['foo', 'bar']))
 
   def test_process_gzip(self):
     path = os.path.join(self._new_tempdir(), 'result')
@@ -280,7 +282,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                         coder=coders.BytesCoder(),
                         compression_type=CompressionTypes.GZIP,
                         validate=True)))
-      beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+      assert_that(result, equal_to(['foo', 'bar']))
 
   def test_process_auto(self):
     path = os.path.join(self._new_tempdir(), 'result.gz')
@@ -293,7 +295,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                         coder=coders.BytesCoder(),
                         compression_type=CompressionTypes.AUTO,
                         validate=True)))
-      beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+      assert_that(result, equal_to(['foo', 'bar']))
 
 
 class TestReadFromTFRecordSource(TestTFRecordSource):
@@ -305,7 +307,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
       result = (p
                 | ReadFromTFRecord(
                     path, compression_type=CompressionTypes.GZIP))
-      beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+      assert_that(result, equal_to(['foo', 'bar']))
 
   def test_process_gzip_auto(self):
     path = os.path.join(self._new_tempdir(), 'result.gz')
@@ -314,7 +316,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
       result = (p
                 | ReadFromTFRecord(
                     path, compression_type=CompressionTypes.AUTO))
-      beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+      assert_that(result, equal_to(['foo', 'bar']))
 
 
 class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
@@ -337,7 +339,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
     # Read the file back and compare.
     with TestPipeline() as p:
       actual_data = p | ReadFromTFRecord(file_path_prefix + '-*')
-      beam.assert_that(actual_data, beam.equal_to(expected_data))
+      assert_that(actual_data, equal_to(expected_data))
 
   def test_end2end_auto_compression(self):
     file_path_prefix = os.path.join(self._new_tempdir(), 'result')
@@ -351,7 +353,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
     # Read the file back and compare.
     with TestPipeline() as p:
       actual_data = p | ReadFromTFRecord(file_path_prefix + '-*')
-      beam.assert_that(actual_data, beam.equal_to(expected_data))
+      assert_that(actual_data, equal_to(expected_data))
 
   def test_end2end_auto_compression_unsharded(self):
     file_path_prefix = os.path.join(self._new_tempdir(), 'result')
@@ -365,7 +367,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
     # Read the file back and compare.
     with TestPipeline() as p:
       actual_data = p | ReadFromTFRecord(file_path_prefix + '.gz')
-      beam.assert_that(actual_data, beam.equal_to(expected_data))
+      assert_that(actual_data, equal_to(expected_data))
 
   @unittest.skipIf(tf is None, 'tensorflow not installed.')
   def test_end2end_example_proto(self):
@@ -385,7 +387,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
       actual_data = (p | ReadFromTFRecord(
           file_path_prefix + '-*',
           coder=beam.coders.ProtoCoder(example.__class__)))
-      beam.assert_that(actual_data, beam.equal_to([example]))
+      assert_that(actual_data, equal_to([example]))
 
   def test_end2end_read_write_read(self):
     path = os.path.join(self._new_tempdir(), 'result')
@@ -400,7 +402,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
     # Read the file back and compare.
     with TestPipeline() as p:
       actual_data = p | ReadFromTFRecord(path+'-*', validate=True)
-      beam.assert_that(actual_data, beam.equal_to(expected_data))
+      assert_that(actual_data, equal_to(expected_data))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 8aa8a8a..e0775d1 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -33,6 +33,8 @@ from apache_beam.pipeline import PipelineVisitor
 from apache_beam.pvalue import AsSingleton
 from apache_beam.runners.dataflow.native_io.iobase import NativeSource
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 from apache_beam.transforms import CombineGlobally
 from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap
@@ -41,8 +43,6 @@ from apache_beam.transforms import DoFn
 from apache_beam.transforms import ParDo
 from apache_beam.transforms import PTransform
 from apache_beam.transforms import WindowInto
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
 from apache_beam.transforms.window import SlidingWindows
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.utils.timestamp import MIN_TIMESTAMP

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
index 062e6f9..b7ba15a 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
@@ -28,9 +28,9 @@ from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.metrics.metricbase import MetricName
 
 from apache_beam.pvalue import AsList
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import BeamAssertException
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import BeamAssertException
+from apache_beam.testing.util import equal_to
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.runners.portability import maptask_executor_runner
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index c61c49f..fa80b1c 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -36,8 +36,8 @@ from apache_beam.metrics.metricbase import MetricName
 from apache_beam.pipeline import Pipeline
 from apache_beam.runners import DirectRunner
 from apache_beam.runners import create_runner
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 from apache_beam.options.pipeline_options import PipelineOptions
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/testing/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
new file mode 100644
index 0000000..60a6b21
--- /dev/null
+++ b/sdks/python/apache_beam/testing/util.py
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for testing Beam pipelines."""
+
+from __future__ import absolute_import
+
+from apache_beam import pvalue
+from apache_beam.transforms import window
+from apache_beam.transforms.core import Create
+from apache_beam.transforms.core import Map
+from apache_beam.transforms.core import WindowInto
+from apache_beam.transforms.util import CoGroupByKey
+from apache_beam.transforms.ptransform import PTransform
+
+
+__all__ = [
+    'assert_that',
+    'equal_to',
+    'is_empty',
+    ]
+
+
+class BeamAssertException(Exception):
+  """Exception raised by matcher classes used by assert_that transform."""
+
+  pass
+
+
+# Note that equal_to always sorts the expected and actual since what we
+# compare are PCollections for which there is no guaranteed order.
+# However the sorting does not go beyond top level therefore [1,2] and [2,1]
+# are considered equal and [[1,2]] and [[2,1]] are not.
+# TODO(silviuc): Add contains_in_any_order-style matchers.
+def equal_to(expected):
+  expected = list(expected)
+
+  def _equal(actual):
+    sorted_expected = sorted(expected)
+    sorted_actual = sorted(actual)
+    if sorted_expected != sorted_actual:
+      raise BeamAssertException(
+          'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
+  return _equal
+
+
+def is_empty():
+  def _empty(actual):
+    actual = list(actual)
+    if actual:
+      raise BeamAssertException(
+          'Failed assert: [] == %r' % actual)
+  return _empty
+
+
+def assert_that(actual, matcher, label='assert_that'):
+  """A PTransform that checks a PCollection has an expected value.
+
+  Note that assert_that should be used only for testing pipelines since the
+  check relies on materializing the entire PCollection being checked.
+
+  Args:
+    actual: A PCollection.
+    matcher: A matcher function taking as argument the actual value of a
+      materialized PCollection. The matcher validates this actual value against
+      expectations and raises BeamAssertException if they are not met.
+    label: Optional string label. This is needed in case several assert_that
+      transforms are introduced in the same pipeline.
+
+  Returns:
+    Ignored.
+  """
+  assert isinstance(actual, pvalue.PCollection)
+
+  class AssertThat(PTransform):
+
+    def expand(self, pcoll):
+      # We must have at least a single element to ensure the matcher
+      # code gets run even if the input pcollection is empty.
+      keyed_singleton = pcoll.pipeline | Create([(None, None)])
+      keyed_actual = (
+          pcoll
+          | WindowInto(window.GlobalWindows())
+          | "ToVoidKey" >> Map(lambda v: (None, v)))
+      _ = ((keyed_singleton, keyed_actual)
+           | "Group" >> CoGroupByKey()
+           | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values)
+           | "Match" >> Map(matcher))
+
+    def default_label(self):
+      return label
+
+  actual | AssertThat()  # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/testing/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/util_test.py b/sdks/python/apache_beam/testing/util_test.py
new file mode 100644
index 0000000..1acebb6
--- /dev/null
+++ b/sdks/python/apache_beam/testing/util_test.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for testing utilities."""
+
+import unittest
+
+from apache_beam import Create
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to, is_empty
+
+
+class UtilTest(unittest.TestCase):
+
+  def test_assert_that_passes(self):
+    with TestPipeline() as p:
+      assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
+
+  def test_assert_that_fails(self):
+    with self.assertRaises(Exception):
+      with TestPipeline() as p:
+        assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3]))
+
+  def test_assert_that_fails_on_empty_input(self):
+    with self.assertRaises(Exception):
+      with TestPipeline() as p:
+        assert_that(p | Create([]), equal_to([1, 2, 3]))
+
+  def test_assert_that_fails_on_empty_expected(self):
+    with self.assertRaises(Exception):
+      with TestPipeline() as p:
+        assert_that(p | Create([1, 2, 3]), is_empty())
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 1822c19..946a60a 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -24,13 +24,13 @@ import hamcrest as hc
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
 import apache_beam.transforms.combiners as combine
+from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.transforms.core import CombineGlobally
 from apache_beam.transforms.core import Create
 from apache_beam.transforms.core import Map
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.util import assert_that, equal_to
 
 
 class CombineTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/create_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py
index 9ede4c7..55ad7f3 100644
--- a/sdks/python/apache_beam/transforms/create_test.py
+++ b/sdks/python/apache_beam/transforms/create_test.py
@@ -20,9 +20,10 @@ import unittest
 
 from apache_beam.io import source_test_utils
 
-from apache_beam import Create, assert_that, equal_to
+from apache_beam import Create
 from apache_beam.coders import FastPrimitivesCoder
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
 
 
 class CreateTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 3320d79..f790660 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -33,12 +33,12 @@ from apache_beam.io.iobase import Read
 from apache_beam.options.pipeline_options import TypeOptions
 import apache_beam.pvalue as pvalue
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.transforms import window
 from apache_beam.transforms.core import GroupByKeyOnly
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.display import DisplayData, DisplayDataItem
 from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.util import assert_that, equal_to
 import apache_beam.typehints as typehints
 from apache_beam.typehints import with_input_types
 from apache_beam.typehints import with_output_types

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 0bc9107..6500681 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -24,8 +24,8 @@ from nose.plugins.attrib import attr
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.transforms import window
-from apache_beam.transforms.util import assert_that, equal_to
 
 
 class SideInputsTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 2574c4b..a27f47f 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -27,6 +27,7 @@ import yaml
 import apache_beam as beam
 from apache_beam.runners import pipeline_context
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.transforms import trigger
 from apache_beam.transforms.core import Windowing
 from apache_beam.transforms.trigger import AccumulationMode
@@ -40,7 +41,6 @@ from apache_beam.transforms.trigger import GeneralTriggerDriver
 from apache_beam.transforms.trigger import InMemoryUnmergedState
 from apache_beam.transforms.trigger import Repeatedly
 from apache_beam.transforms.trigger import TriggerFn
-from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import IntervalWindow
 from apache_beam.transforms.window import MIN_TIMESTAMP

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index a6ecf0a..a7484ac 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -20,14 +20,10 @@
 
 from __future__ import absolute_import
 
-from apache_beam import pvalue
-from apache_beam.transforms import window
 from apache_beam.transforms.core import CombinePerKey
-from apache_beam.transforms.core import Create
 from apache_beam.transforms.core import Flatten
 from apache_beam.transforms.core import GroupByKey
 from apache_beam.transforms.core import Map
-from apache_beam.transforms.core import WindowInto
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.ptransform import ptransform_fn
 
@@ -38,9 +34,6 @@ __all__ = [
     'KvSwap',
     'RemoveDuplicates',
     'Values',
-    'assert_that',
-    'equal_to',
-    'is_empty',
     ]
 
 
@@ -169,75 +162,3 @@ def RemoveDuplicates(pcoll):  # pylint: disable=invalid-name
           | 'ToPairs' >> Map(lambda v: (v, None))
           | 'Group' >> CombinePerKey(lambda vs: None)
           | 'RemoveDuplicates' >> Keys())
-
-
-class BeamAssertException(Exception):
-  """Exception raised by matcher classes used by assert_that transform."""
-
-  pass
-
-
-# Note that equal_to always sorts the expected and actual since what we
-# compare are PCollections for which there is no guaranteed order.
-# However the sorting does not go beyond top level therefore [1,2] and [2,1]
-# are considered equal and [[1,2]] and [[2,1]] are not.
-# TODO(silviuc): Add contains_in_any_order-style matchers.
-def equal_to(expected):
-  expected = list(expected)
-
-  def _equal(actual):
-    sorted_expected = sorted(expected)
-    sorted_actual = sorted(actual)
-    if sorted_expected != sorted_actual:
-      raise BeamAssertException(
-          'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
-  return _equal
-
-
-def is_empty():
-  def _empty(actual):
-    actual = list(actual)
-    if actual:
-      raise BeamAssertException(
-          'Failed assert: [] == %r' % actual)
-  return _empty
-
-
-def assert_that(actual, matcher, label='assert_that'):
-  """A PTransform that checks a PCollection has an expected value.
-
-  Note that assert_that should be used only for testing pipelines since the
-  check relies on materializing the entire PCollection being checked.
-
-  Args:
-    actual: A PCollection.
-    matcher: A matcher function taking as argument the actual value of a
-      materialized PCollection. The matcher validates this actual value against
-      expectations and raises BeamAssertException if they are not met.
-    label: Optional string label. This is needed in case several assert_that
-      transforms are introduced in the same pipeline.
-
-  Returns:
-    Ignored.
-  """
-  assert isinstance(actual, pvalue.PCollection)
-
-  class AssertThat(PTransform):
-
-    def expand(self, pcoll):
-      # We must have at least a single element to ensure the matcher
-      # code gets run even if the input pcollection is empty.
-      keyed_singleton = pcoll.pipeline | Create([(None, None)])
-      keyed_actual = (
-          pcoll
-          | WindowInto(window.GlobalWindows())
-          | "ToVoidKey" >> Map(lambda v: (None, v)))
-      _ = ((keyed_singleton, keyed_actual)
-           | "Group" >> CoGroupByKey()
-           | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values)
-           | "Match" >> Map(matcher))
-
-    def default_label(self):
-      return label
-
-  actual | AssertThat()  # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
deleted file mode 100644
index 7fdef70..0000000
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the util transforms."""
-
-import unittest
-
-from apache_beam import Create
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that, equal_to, is_empty
-
-
-class UtilTest(unittest.TestCase):
-
-  def test_assert_that_passes(self):
-    with TestPipeline() as p:
-      assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
-
-  def test_assert_that_fails(self):
-    with self.assertRaises(Exception):
-      with TestPipeline() as p:
-        assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3]))
-
-  def test_assert_that_fails_on_empty_input(self):
-    with self.assertRaises(Exception):
-      with TestPipeline() as p:
-        assert_that(p | Create([]), equal_to([1, 2, 3]))
-
-  def test_assert_that_fails_on_empty_expected(self):
-    with self.assertRaises(Exception):
-      with TestPipeline() as p:
-        assert_that(p | Create([1, 2, 3]), is_empty())
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index a7797dd..fd1bb9d 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -21,6 +21,7 @@ import unittest
 
 from apache_beam.runners import pipeline_context
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.transforms import CombinePerKey
 from apache_beam.transforms import combiners
 from apache_beam.transforms import core
@@ -31,7 +32,6 @@ from apache_beam.transforms import WindowInto
 from apache_beam.transforms.core import Windowing
 from apache_beam.transforms.trigger import AccumulationMode
 from apache_beam.transforms.trigger import AfterCount
-from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import GlobalWindows

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index 27e7caa..e31b9cc 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -23,8 +23,8 @@ import apache_beam as beam
 
 from apache_beam.io import iobase
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, is_empty
 from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.util import assert_that, is_empty
 
 
 class _TestSink(iobase.Sink):

http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 3494cfe..589dc0e 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -25,7 +25,7 @@ from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.options.pipeline_options import OptionsContext
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that, equal_to
+from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.typehints import WithTypeHints
 
 # These test often construct a pipeline as value | PTransform to test side


[18/19] beam git commit: Add __all__ tags to modules in package apache_beam/testing

Posted by al...@apache.org.
Add __all__ tags to modules in package apache_beam/testing


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0f910b43
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0f910b43
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0f910b43

Branch: refs/heads/release-2.0.0
Commit: 0f910b43cbccca73d3b492560b9fc0d3f0901e21
Parents: a6f5888
Author: Charles Chen <cc...@google.com>
Authored: Wed May 10 23:20:20 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:54:12 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/testing/pipeline_verifiers.py |  8 ++++++++
 sdks/python/apache_beam/testing/test_pipeline.py      |  5 +++++
 sdks/python/apache_beam/testing/test_stream.py        | 14 +++++++++++++-
 sdks/python/apache_beam/testing/test_utils.py         |  6 +++++-
 4 files changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0f910b43/sdks/python/apache_beam/testing/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/pipeline_verifiers.py b/sdks/python/apache_beam/testing/pipeline_verifiers.py
index 5a6082a..a08eb54 100644
--- a/sdks/python/apache_beam/testing/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/testing/pipeline_verifiers.py
@@ -32,6 +32,14 @@ from apache_beam.runners.runner import PipelineState
 from apache_beam.testing import test_utils as utils
 from apache_beam.utils import retry
 
+
+__all__ = [
+    'PipelineStateMatcher',
+    'FileChecksumMatcher',
+    'retry_on_io_error_and_server_error',
+    ]
+
+
 try:
   from apitools.base.py.exceptions import HttpError
 except ImportError:

http://git-wip-us.apache.org/repos/asf/beam/blob/0f910b43/sdks/python/apache_beam/testing/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py
index 20f4839..13b1639 100644
--- a/sdks/python/apache_beam/testing/test_pipeline.py
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -27,6 +27,11 @@ from apache_beam.options.pipeline_options import PipelineOptions
 from nose.plugins.skip import SkipTest
 
 
+__all__ = [
+    'TestPipeline',
+    ]
+
+
 class TestPipeline(Pipeline):
   """TestPipeline class is used inside of Beam tests that can be configured to
   run against pipeline runner.

http://git-wip-us.apache.org/repos/asf/beam/blob/0f910b43/sdks/python/apache_beam/testing/test_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py
index 7ae27b7..a06bcd0 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -15,7 +15,10 @@
 # limitations under the License.
 #
 
-"""Provides TestStream for verifying streaming runner semantics."""
+"""Provides TestStream for verifying streaming runner semantics.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
 
 from abc import ABCMeta
 from abc import abstractmethod
@@ -28,6 +31,15 @@ from apache_beam.utils import timestamp
 from apache_beam.utils.windowed_value import WindowedValue
 
 
+__all__ = [
+    'Event',
+    'ElementEvent',
+    'WatermarkEvent',
+    'ProcessingTimeEvent',
+    'TestStream',
+    ]
+
+
 class Event(object):
   """Test stream event to be emitted during execution of a TestStream."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0f910b43/sdks/python/apache_beam/testing/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py
index 666207e..9feb80e 100644
--- a/sdks/python/apache_beam/testing/test_utils.py
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -15,7 +15,10 @@
 # limitations under the License.
 #
 
-"""Utility methods for testing"""
+"""Utility methods for testing
+
+For internal use only; no backwards-compatibility guarantees.
+"""
 
 import hashlib
 import imp
@@ -23,6 +26,7 @@ from mock import Mock, patch
 
 from apache_beam.utils import retry
 
+
 DEFAULT_HASHING_ALG = 'sha1'
 
 


[05/19] beam git commit: Add internal usage only comments to util/

Posted by al...@apache.org.
Add internal usage only comments to util/


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9730f56e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9730f56e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9730f56e

Branch: refs/heads/release-2.0.0
Commit: 9730f56eaf62c8164a517d5fe0cc03650c39a732
Parents: 65aa0ff
Author: Ahmet Altay <al...@google.com>
Authored: Thu May 11 11:34:59 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/utils/__init__.py    | 5 ++++-
 sdks/python/apache_beam/utils/annotations.py | 4 +++-
 sdks/python/apache_beam/utils/counters.py    | 5 ++++-
 sdks/python/apache_beam/utils/processes.py   | 6 +++++-
 sdks/python/apache_beam/utils/profiler.py    | 5 ++++-
 sdks/python/apache_beam/utils/proto_utils.py | 2 ++
 sdks/python/apache_beam/utils/retry.py       | 2 ++
 sdks/python/apache_beam/utils/timestamp.py   | 5 ++++-
 sdks/python/apache_beam/utils/urns.py        | 2 ++
 9 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/__init__.py b/sdks/python/apache_beam/utils/__init__.py
index 74cf45d..635c80f 100644
--- a/sdks/python/apache_beam/utils/__init__.py
+++ b/sdks/python/apache_beam/utils/__init__.py
@@ -15,4 +15,7 @@
 # limitations under the License.
 #
 
-"""A package containing utilities."""
+"""A package containing internal utilities.
+
+For internal use only; no backwards-compatibility guarantees.
+"""

http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/annotations.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/annotations.py b/sdks/python/apache_beam/utils/annotations.py
index 92318b1..017dd6b 100644
--- a/sdks/python/apache_beam/utils/annotations.py
+++ b/sdks/python/apache_beam/utils/annotations.py
@@ -15,7 +15,9 @@
 # limitations under the License.
 #
 
-""" Deprecated and experimental annotations.
+"""Deprecated and experimental annotations.
+
+For internal use only; no backwards-compatibility guarantees.
 
 Annotations come in two flavors: deprecated and experimental
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/counters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py
index e41d732..b379461 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -18,7 +18,10 @@
 # cython: profile=False
 # cython: overflowcheck=True
 
-"""Counters collect the progress of the Worker for reporting to the service."""
+"""Counters collect the progress of the Worker for reporting to the service.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
 
 import threading
 from apache_beam.transforms import cy_combiners

http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/processes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py
index e089090..e5fd9c8 100644
--- a/sdks/python/apache_beam/utils/processes.py
+++ b/sdks/python/apache_beam/utils/processes.py
@@ -14,7 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-"""Cross-platform utilities for creating subprocesses."""
+
+"""Cross-platform utilities for creating subprocesses.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
 
 import platform
 import subprocess

http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/profiler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/profiler.py b/sdks/python/apache_beam/utils/profiler.py
index 852b659..a2c3f6a 100644
--- a/sdks/python/apache_beam/utils/profiler.py
+++ b/sdks/python/apache_beam/utils/profiler.py
@@ -15,7 +15,10 @@
 # limitations under the License.
 #
 
-"""A profiler context manager based on cProfile.Profile objects."""
+"""A profiler context manager based on cProfile.Profile objects.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
 
 import cProfile
 import logging

http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py
index d929a92..090a821 100644
--- a/sdks/python/apache_beam/utils/proto_utils.py
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+"""For internal use only; no backwards-compatibility guarantees."""
+
 from google.protobuf import any_pb2
 from google.protobuf import struct_pb2
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/retry.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
index 2c32f0f..1a8b907 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -17,6 +17,8 @@
 
 """Retry decorators for calls raising exceptions.
 
+For internal use only; no backwards-compatibility guarantees.
+
 This module is used mostly to decorate all integration points where the code
 makes calls to remote services. Searching through the code base for @retry
 should find all such places. For this reason even places where retry is not

http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/timestamp.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py
index 8b2ccda..5d1b48c 100644
--- a/sdks/python/apache_beam/utils/timestamp.py
+++ b/sdks/python/apache_beam/utils/timestamp.py
@@ -15,7 +15,10 @@
 # limitations under the License.
 #
 
-"""Timestamp utilities."""
+"""Timestamp utilities.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
 
 from __future__ import absolute_import
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 46bd8f5..379b5ff 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+"""For internal use only; no backwards-compatibility guarantees."""
+
 import abc
 import inspect
 


[13/19] beam git commit: Remove some internal details from the public API.

Posted by al...@apache.org.
Remove some internal details from the public API.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e12cf0df
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e12cf0df
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e12cf0df

Branch: refs/heads/release-2.0.0
Commit: e12cf0dfd810ec26965ae98e5a2256f560c6ff6d
Parents: 2070f11
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed May 10 15:55:46 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:37 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py             |  4 ++--
 .../runners/dataflow/dataflow_runner.py         |  6 +++---
 .../runners/dataflow/dataflow_runner_test.py    |  6 +++---
 .../apache_beam/runners/direct/executor.py      |  2 +-
 .../runners/direct/transform_evaluator.py       | 10 ++++-----
 sdks/python/apache_beam/transforms/__init__.py  |  2 +-
 sdks/python/apache_beam/transforms/core.py      | 22 ++++++++++----------
 .../python/apache_beam/transforms/ptransform.py |  6 +++---
 .../apache_beam/transforms/ptransform_test.py   | 12 +++++------
 sdks/python/apache_beam/typehints/typecheck.py  |  2 +-
 10 files changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 79480d7..5048534 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -77,8 +77,8 @@ class Pipeline(object):
   the PValues are the edges.
 
   All the transforms applied to the pipeline must have distinct full labels.
-  If same transform instance needs to be applied then a clone should be created
-  with a new label (e.g., transform.clone('new label')).
+  If same transform instance needs to be applied then the right shift operator
+  should be used to designate new names (e.g. `input | "label" >> my_tranform`).
   """
 
   def __init__(self, runner=None, options=None, argv=None):

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index da8de9d..0ecd22a 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -160,7 +160,7 @@ class DataflowRunner(PipelineRunner):
 
     class GroupByKeyInputVisitor(PipelineVisitor):
       """A visitor that replaces `Any` element type for input `PCollection` of
-      a `GroupByKey` or `GroupByKeyOnly` with a `KV` type.
+      a `GroupByKey` or `_GroupByKeyOnly` with a `KV` type.
 
       TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,
       we could directly replace the coder instead of mutating the element type.
@@ -169,8 +169,8 @@ class DataflowRunner(PipelineRunner):
       def visit_transform(self, transform_node):
         # Imported here to avoid circular dependencies.
         # pylint: disable=wrong-import-order, wrong-import-position
-        from apache_beam.transforms.core import GroupByKey, GroupByKeyOnly
-        if isinstance(transform_node.transform, (GroupByKey, GroupByKeyOnly)):
+        from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly
+        if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)):
           pcoll = transform_node.inputs[0]
           input_type = pcoll.element_type
           # If input_type is not specified, then treat it as `Any`.

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index ac9b028..ff4b51d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -37,7 +37,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeExceptio
 from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.transforms.core import GroupByKeyOnly
+from apache_beam.transforms.core import _GroupByKeyOnly
 from apache_beam.typehints import typehints
 
 # Protect against environments where apitools library is not available.
@@ -185,7 +185,7 @@ class DataflowRunnerTest(unittest.TestCase):
     pcoll1 = PCollection(p)
     pcoll2 = PCollection(p)
     pcoll3 = PCollection(p)
-    for transform in [GroupByKeyOnly(), beam.GroupByKey()]:
+    for transform in [_GroupByKeyOnly(), beam.GroupByKey()]:
       pcoll1.element_type = None
       pcoll2.element_type = typehints.Any
       pcoll3.element_type = typehints.KV[typehints.Any, typehints.Any]
@@ -199,7 +199,7 @@ class DataflowRunnerTest(unittest.TestCase):
     p = TestPipeline()
     pcoll1 = PCollection(p)
     pcoll2 = PCollection(p)
-    for transform in [GroupByKeyOnly(), beam.GroupByKey()]:
+    for transform in [_GroupByKeyOnly(), beam.GroupByKey()]:
       pcoll1.element_type = typehints.TupleSequenceConstraint
       pcoll2.element_type = typehints.Set
       err_msg = "Input to GroupByKey must be of Tuple or Any type"

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 9efbede..86db291 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -155,7 +155,7 @@ class _SerialEvaluationState(_TransformEvaluationState):
   item of work will be submitted to the ExecutorService at any time.
 
   A principal use of this is for evaluators that keeps a global state such as
-  GroupByKeyOnly.
+  _GroupByKeyOnly.
   """
 
   def __init__(self, executor_service, scheduled):

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 6984ded..b1cb626 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -53,7 +53,7 @@ class TransformEvaluatorRegistry(object):
         io.Read: _BoundedReadEvaluator,
         core.Flatten: _FlattenEvaluator,
         core.ParDo: _ParDoEvaluator,
-        core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
+        core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
         _NativeWrite: _NativeWriteEvaluator,
     }
 
@@ -83,7 +83,7 @@ class TransformEvaluatorRegistry(object):
     """Returns True if this applied_ptransform should run one bundle at a time.
 
     Some TransformEvaluators use a global state object to keep track of their
-    global execution state. For example evaluator for GroupByKeyOnly uses this
+    global execution state. For example evaluator for _GroupByKeyOnly uses this
     state as an in memory dictionary to buffer keys.
 
     Serially executed evaluators will act as syncing point in the graph and
@@ -99,7 +99,7 @@ class TransformEvaluatorRegistry(object):
       True if executor should execute applied_ptransform serially.
     """
     return isinstance(applied_ptransform.transform,
-                      (core.GroupByKeyOnly, _NativeWrite))
+                      (core._GroupByKeyOnly, _NativeWrite))
 
 
 class _TransformEvaluator(object):
@@ -325,7 +325,7 @@ class _ParDoEvaluator(_TransformEvaluator):
 
 
 class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
-  """TransformEvaluator for GroupByKeyOnly transform."""
+  """TransformEvaluator for _GroupByKeyOnly transform."""
 
   MAX_ELEMENT_PER_BUNDLE = None
 
@@ -369,7 +369,7 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
       k, v = element.value
       self.state.output[self.key_coder.encode(k)].append(v)
     else:
-      raise TypeCheckError('Input to GroupByKeyOnly must be a PCollection of '
+      raise TypeCheckError('Input to _GroupByKeyOnly must be a PCollection of '
                            'windowed key-value pairs. Instead received: %r.'
                            % element)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/transforms/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py
index 847fb8f..b77b0f6 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -21,5 +21,5 @@
 from apache_beam.transforms import combiners
 from apache_beam.transforms.core import *
 from apache_beam.transforms.ptransform import *
-from apache_beam.transforms.timeutil import *
+from apache_beam.transforms.timeutil import TimeDomain
 from apache_beam.transforms.util import *

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index abe699f..0e497f9 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -215,7 +215,7 @@ class DoFn(WithTypeHints, HasDisplayData):
       return Any
     return type_hint
 
-  def process_argspec_fn(self):
+  def _process_argspec_fn(self):
     """Returns the Python callable that will eventually be invoked.
 
     This should ideally be the user-level function that is called with
@@ -307,7 +307,7 @@ class CallableWrapperDoFn(DoFn):
     return self._strip_output_annotations(
         trivial_inference.infer_return_type(self._fn, [input_type]))
 
-  def process_argspec_fn(self):
+  def _process_argspec_fn(self):
     return getattr(self._fn, '_argspec_fn', self._fn)
 
 
@@ -641,8 +641,8 @@ class ParDo(PTransformWithSideInputs):
       return fn
     return CallableWrapperDoFn(fn)
 
-  def process_argspec_fn(self):
-    return self.fn.process_argspec_fn()
+  def _process_argspec_fn(self):
+    return self.fn._process_argspec_fn()
 
   def display_data(self):
     return {'fn': DisplayDataItem(self.fn.__class__,
@@ -870,19 +870,19 @@ class CombineGlobally(PTransform):
   def default_label(self):
     return 'CombineGlobally(%s)' % ptransform.label_from_callable(self.fn)
 
-  def clone(self, **extra_attributes):
+  def _clone(self, **extra_attributes):
     clone = copy.copy(self)
     clone.__dict__.update(extra_attributes)
     return clone
 
   def with_defaults(self, has_defaults=True):
-    return self.clone(has_defaults=has_defaults)
+    return self._clone(has_defaults=has_defaults)
 
   def without_defaults(self):
     return self.with_defaults(False)
 
   def as_singleton_view(self):
-    return self.clone(as_view=True)
+    return self._clone(as_view=True)
 
   def expand(self, pcoll):
     def add_input_types(transform):
@@ -964,7 +964,7 @@ class CombinePerKey(PTransformWithSideInputs):
   def default_label(self):
     return '%s(%s)' % (self.__class__.__name__, self._fn_label)
 
-  def process_argspec_fn(self):
+  def _process_argspec_fn(self):
     return self.fn._fn  # pylint: disable=protected-access
 
   def expand(self, pcoll):
@@ -1133,7 +1133,7 @@ class GroupByKey(PTransform):
       return (pcoll
               | 'ReifyWindows' >> (ParDo(self.ReifyWindows())
                  .with_output_types(reify_output_type))
-              | 'GroupByKey' >> (GroupByKeyOnly()
+              | 'GroupByKey' >> (_GroupByKeyOnly()
                  .with_input_types(reify_output_type)
                  .with_output_types(gbk_input_type))
               | ('GroupByWindow' >> ParDo(
@@ -1144,14 +1144,14 @@ class GroupByKey(PTransform):
       # The input_type is None, run the default
       return (pcoll
               | 'ReifyWindows' >> ParDo(self.ReifyWindows())
-              | 'GroupByKey' >> GroupByKeyOnly()
+              | 'GroupByKey' >> _GroupByKeyOnly()
               | 'GroupByWindow' >> ParDo(
                     self.GroupAlsoByWindow(pcoll.windowing)))
 
 
 @typehints.with_input_types(typehints.KV[K, V])
 @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
-class GroupByKeyOnly(PTransform):
+class _GroupByKeyOnly(PTransform):
   """A group by key transform, ignoring windows."""
   def infer_output_type(self, input_type):
     key_type, value_type = trivial_inference.key_value_types(input_type)

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 8898c36..bd2a120 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -303,7 +303,7 @@ class PTransform(WithTypeHints, HasDisplayData):
     # TODO(ccy): further refine this API.
     return None
 
-  def clone(self, new_label):
+  def _clone(self, new_label):
     """Clones the current transform instance under a new label."""
     transform = copy.copy(self)
     transform.label = new_label
@@ -567,7 +567,7 @@ class PTransformWithSideInputs(PTransform):
 
       arg_types = [pvalueish.element_type] + [element_type(v) for v in args]
       kwargs_types = {k: element_type(v) for (k, v) in kwargs.items()}
-      argspec_fn = self.process_argspec_fn()
+      argspec_fn = self._process_argspec_fn()
       bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types)
       hints = getcallargs_forhints(argspec_fn, *type_hints[0], **type_hints[1])
       for arg, hint in hints.items():
@@ -581,7 +581,7 @@ class PTransformWithSideInputs(PTransform):
               'Type hint violation for \'%s\': requires %s but got %s for %s'
               % (self.label, hint, bindings[arg], arg))
 
-  def process_argspec_fn(self):
+  def _process_argspec_fn(self):
     """Returns an argspec of the function actually consuming the data.
     """
     raise NotImplementedError

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index f790660..efc5978 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -35,7 +35,7 @@ import apache_beam.pvalue as pvalue
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that, equal_to
 from apache_beam.transforms import window
-from apache_beam.transforms.core import GroupByKeyOnly
+from apache_beam.transforms.core import _GroupByKeyOnly
 import apache_beam.transforms.combiners as combine
 from apache_beam.transforms.display import DisplayData, DisplayDataItem
 from apache_beam.transforms.ptransform import PTransform
@@ -580,7 +580,7 @@ class PTransformTest(unittest.TestCase):
     pipeline = TestPipeline()
     pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
     with self.assertRaises(typehints.TypeCheckError) as cm:
-      pcolls | 'D' >> GroupByKeyOnly()
+      pcolls | 'D' >> _GroupByKeyOnly()
       pipeline.run()
 
     expected_error_prefix = ('Input type hint violation at D: expected '
@@ -1088,7 +1088,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
          | ('Pair' >> beam.Map(lambda x: (x, ord(x)))
             .with_output_types(typehints.KV[str, str]))
-         | GroupByKeyOnly())
+         | _GroupByKeyOnly())
 
     # Output type should correctly be deduced.
     # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -1112,7 +1112,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create([1, 2, 3]).with_output_types(int)
-       | 'F' >> GroupByKeyOnly())
+       | 'F' >> _GroupByKeyOnly())
 
     self.assertEqual("Input type hint violation at F: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1155,7 +1155,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       (self.p
        | 'Nums' >> beam.Create(range(5)).with_output_types(int)
        | 'ModDup' >> beam.Map(lambda x: (x % 2, x))
-       | GroupByKeyOnly())
+       | _GroupByKeyOnly())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform '
@@ -1978,7 +1978,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_gbk_type_inference(self):
     self.assertEqual(
         typehints.Tuple[str, typehints.Iterable[int]],
-        GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
+        _GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
 
   def test_pipeline_inference(self):
     created = self.p | beam.Create(['a', 'b', 'c'])

http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py
index 09b73f9..89a5f5c 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -109,7 +109,7 @@ class TypeCheckWrapperDoFn(AbstractDoFnWrapper):
   def __init__(self, dofn, type_hints, label=None):
     super(TypeCheckWrapperDoFn, self).__init__(dofn)
     self.dofn = dofn
-    self._process_fn = self.dofn.process_argspec_fn()
+    self._process_fn = self.dofn._process_argspec_fn()
     if type_hints.input_types:
       input_args, input_kwargs = type_hints.input_types
       self._input_hints = getcallargs_forhints(


[19/19] beam git commit: This closes #3103

Posted by al...@apache.org.
This closes #3103


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1cc32c65
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1cc32c65
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1cc32c65

Branch: refs/heads/release-2.0.0
Commit: 1cc32c65ea9f49c78d22999823884e1363a921d4
Parents: 65aa0ff 0f910b4
Author: Ahmet Altay <al...@google.com>
Authored: Thu May 11 17:11:38 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 17:11:38 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py    |   2 +
 sdks/python/apache_beam/coders/coders.py        |  32 +++++-
 sdks/python/apache_beam/coders/coders_test.py   |  11 +-
 .../apache_beam/coders/coders_test_common.py    |   2 +-
 sdks/python/apache_beam/coders/observable.py    |   5 +-
 sdks/python/apache_beam/coders/slow_stream.py   |   5 +-
 .../apache_beam/coders/standard_coders_test.py  |   2 +-
 sdks/python/apache_beam/coders/stream.pyx       |   5 +
 sdks/python/apache_beam/coders/typecoders.py    |   3 +
 .../examples/complete/autocomplete_test.py      |   4 +-
 .../examples/complete/estimate_pi_test.py       |   4 +-
 .../complete/game/hourly_team_score_test.py     |   4 +-
 .../examples/complete/game/user_score_test.py   |   4 +-
 .../apache_beam/examples/complete/tfidf_test.py |   4 +-
 .../complete/top_wikipedia_sessions_test.py     |   4 +-
 .../cookbook/bigquery_side_input_test.py        |   4 +-
 .../cookbook/bigquery_tornadoes_test.py         |   6 +-
 .../examples/cookbook/coders_test.py            |   4 +-
 .../examples/cookbook/combiners_test.py         |   6 +-
 .../examples/cookbook/custom_ptransform_test.py |   4 +-
 .../examples/cookbook/filters_test.py           |  12 ++-
 .../examples/cookbook/mergecontacts.py          |  14 +--
 .../apache_beam/examples/snippets/snippets.py   |  17 +--
 .../examples/snippets/snippets_test.py          |  35 +++---
 .../apache_beam/examples/wordcount_debugging.py |   6 +-
 sdks/python/apache_beam/internal/__init__.py    |   2 +
 .../python/apache_beam/internal/gcp/__init__.py |   2 +
 sdks/python/apache_beam/io/avroio_test.py       |   4 +-
 sdks/python/apache_beam/io/concat_source.py     |   4 +-
 .../python/apache_beam/io/concat_source_test.py |   4 +-
 sdks/python/apache_beam/io/filebasedsource.py   |   2 +
 .../apache_beam/io/filebasedsource_test.py      |   4 +-
 sdks/python/apache_beam/io/fileio.py            |   7 ++
 sdks/python/apache_beam/io/fileio_test.py       |   2 +-
 sdks/python/apache_beam/io/filesystem.py        |   3 +
 sdks/python/apache_beam/io/filesystems.py       |   2 +
 .../io/gcp/datastore/v1/fake_datastore.py       |   6 +-
 .../apache_beam/io/gcp/datastore/v1/helper.py   |   5 +-
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py |   2 +
 sdks/python/apache_beam/io/gcp/gcsio.py         |   3 +
 sdks/python/apache_beam/io/gcp/pubsub.py        |   2 +
 .../io/gcp/tests/bigquery_matcher.py            |   3 +
 sdks/python/apache_beam/io/iobase.py            |   2 +
 sdks/python/apache_beam/io/localfilesystem.py   |   2 +
 sdks/python/apache_beam/io/range_trackers.py    |   7 +-
 sdks/python/apache_beam/io/source_test_utils.py |   8 ++
 sdks/python/apache_beam/io/sources_test.py      |   4 +-
 sdks/python/apache_beam/io/textio.py            |   2 +-
 sdks/python/apache_beam/io/textio_test.py       |   5 +-
 sdks/python/apache_beam/io/tfrecordio_test.py   |  24 +++--
 sdks/python/apache_beam/metrics/__init__.py     |   1 +
 sdks/python/apache_beam/metrics/cells.py        |   2 +
 sdks/python/apache_beam/metrics/execution.py    |   6 +-
 sdks/python/apache_beam/metrics/metric.py       |   4 +
 sdks/python/apache_beam/metrics/metricbase.py   |   2 +
 .../apache_beam/options/pipeline_options.py     |  20 +++-
 .../options/pipeline_options_validator.py       |   2 +
 .../apache_beam/options/value_provider.py       |   8 ++
 sdks/python/apache_beam/pipeline.py             |  13 ++-
 sdks/python/apache_beam/pipeline_test.py        |   4 +-
 sdks/python/apache_beam/pvalue.py               |  11 ++
 sdks/python/apache_beam/runners/api/__init__.py |   4 +-
 sdks/python/apache_beam/runners/common.py       |   5 +-
 .../apache_beam/runners/dataflow/__init__.py    |   9 ++
 .../runners/dataflow/dataflow_runner.py         |   9 +-
 .../runners/dataflow/dataflow_runner_test.py    |   5 +-
 .../runners/dataflow/test_dataflow_runner.py    |   3 +
 .../apache_beam/runners/direct/__init__.py      |   6 +-
 .../apache_beam/runners/direct/direct_runner.py |   3 +
 .../apache_beam/runners/direct/executor.py      |   2 +-
 .../runners/direct/transform_evaluator.py       |  10 +-
 .../apache_beam/runners/pipeline_context.py     |   6 ++
 .../apache_beam/runners/portability/__init__.py |   2 +
 .../portability/maptask_executor_runner.py      |   2 +-
 .../portability/maptask_executor_runner_test.py |   6 +-
 sdks/python/apache_beam/runners/runner.py       |   3 +
 sdks/python/apache_beam/runners/runner_test.py  |   4 +-
 .../apache_beam/runners/worker/__init__.py      |   2 +
 .../apache_beam/testing/pipeline_verifiers.py   |   8 ++
 .../python/apache_beam/testing/test_pipeline.py |   5 +
 sdks/python/apache_beam/testing/test_stream.py  |  14 ++-
 sdks/python/apache_beam/testing/test_utils.py   |   6 +-
 sdks/python/apache_beam/testing/util.py         | 107 +++++++++++++++++++
 sdks/python/apache_beam/testing/util_test.py    |  50 +++++++++
 sdks/python/apache_beam/transforms/__init__.py  |   2 +-
 .../apache_beam/transforms/combiners_test.py    |   2 +-
 sdks/python/apache_beam/transforms/core.py      |  50 ++++++---
 .../apache_beam/transforms/create_test.py       |   3 +-
 .../apache_beam/transforms/cy_combiners.py      |   5 +-
 .../python/apache_beam/transforms/ptransform.py |  29 +++--
 .../apache_beam/transforms/ptransform_test.py   |  13 +--
 .../python/apache_beam/transforms/sideinputs.py |   2 +
 .../apache_beam/transforms/sideinputs_test.py   |   2 +-
 sdks/python/apache_beam/transforms/timeutil.py  |   5 +
 sdks/python/apache_beam/transforms/trigger.py   |  14 +++
 .../apache_beam/transforms/trigger_test.py      |   2 +-
 sdks/python/apache_beam/transforms/util.py      |  79 --------------
 sdks/python/apache_beam/transforms/util_test.py |  50 ---------
 sdks/python/apache_beam/transforms/window.py    |  17 ++-
 .../apache_beam/transforms/window_test.py       |   2 +-
 .../transforms/write_ptransform_test.py         |   2 +-
 sdks/python/apache_beam/typehints/decorators.py |  17 ++-
 sdks/python/apache_beam/typehints/opcodes.py    |   2 +
 .../apache_beam/typehints/trivial_inference.py  |   2 +
 sdks/python/apache_beam/typehints/typecheck.py  |  17 +--
 .../typehints/typed_pipeline_test.py            |   2 +-
 sdks/python/apache_beam/typehints/typehints.py  |  18 ++++
 .../apache_beam/typehints/typehints_test.py     |  19 ++--
 sdks/python/apache_beam/utils/__init__.py       |   5 +-
 sdks/python/apache_beam/utils/annotations.py    |   4 +-
 sdks/python/apache_beam/utils/counters.py       |   5 +-
 sdks/python/apache_beam/utils/processes.py      |   6 +-
 sdks/python/apache_beam/utils/profiler.py       |   5 +-
 sdks/python/apache_beam/utils/proto_utils.py    |   2 +
 sdks/python/apache_beam/utils/retry.py          |   2 +
 sdks/python/apache_beam/utils/timestamp.py      |   5 +-
 sdks/python/apache_beam/utils/urns.py           |   2 +
 sdks/python/apache_beam/utils/windowed_value.py |   4 +-
 118 files changed, 724 insertions(+), 326 deletions(-)
----------------------------------------------------------------------



[02/19] beam git commit: fix lint error in fake_datastore.py

Posted by al...@apache.org.
fix lint error in fake_datastore.py


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dec27d8f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dec27d8f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dec27d8f

Branch: refs/heads/release-2.0.0
Commit: dec27d8f205f7a27e427ae9e451c4c875fa8cf04
Parents: 7fd012b
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Thu May 11 14:39:55 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dec27d8f/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py b/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
index 0caf6d6..2332579 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
@@ -31,6 +31,7 @@ except ImportError:
   pass
 # pylint: enable=wrong-import-order, wrong-import-position
 
+
 def create_run_query(entities, batch_size):
   """A fake datastore run_query method that returns entities in batches.
 


[09/19] beam git commit: [BEAM-1340] Adds __all__ tags to classes in package apache_beam/io.

Posted by al...@apache.org.
[BEAM-1340] Adds __all__ tags to classes in package apache_beam/io.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c784f95
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c784f95
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c784f95

Branch: refs/heads/release-2.0.0
Commit: 0c784f958f11eb0e448cc448d2b9d36fd90e9972
Parents: dec27d8
Author: chamikara@google.com <ch...@google.com>
Authored: Thu May 11 11:46:46 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/concat_source.py              | 4 +++-
 sdks/python/apache_beam/io/filebasedsource.py            | 2 ++
 sdks/python/apache_beam/io/fileio.py                     | 7 +++++++
 sdks/python/apache_beam/io/filesystem.py                 | 3 +++
 sdks/python/apache_beam/io/filesystems.py                | 2 ++
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py          | 2 ++
 sdks/python/apache_beam/io/gcp/gcsio.py                  | 3 +++
 sdks/python/apache_beam/io/gcp/pubsub.py                 | 2 ++
 sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py | 3 +++
 sdks/python/apache_beam/io/iobase.py                     | 2 ++
 sdks/python/apache_beam/io/localfilesystem.py            | 2 ++
 sdks/python/apache_beam/io/range_trackers.py             | 7 ++++++-
 sdks/python/apache_beam/io/source_test_utils.py          | 8 ++++++++
 13 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/concat_source.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py
index dfd1695..56c4cca 100644
--- a/sdks/python/apache_beam/io/concat_source.py
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -15,7 +15,9 @@
 # limitations under the License.
 #
 
-"""Concat Source, which reads the union of several other sources.
+"""For internal use only; no backwards-compatibility guarantees.
+
+Concat Source, which reads the union of several other sources.
 """
 
 import bisect

http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 215e015..bb9efc4 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -38,6 +38,8 @@ from apache_beam.options.value_provider import check_accessible
 
 MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
 
+__all__ = ['FileBasedSource']
+
 
 class FileBasedSource(iobase.BoundedSource):
   """A ``BoundedSource`` for reading a file glob of a given type."""

http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index ca3a759..aa18093 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -37,6 +37,8 @@ from apache_beam.options.value_provider import check_accessible
 
 DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
 
+__all__ = ['FileBasedSink']
+
 
 class FileSink(iobase.Sink):
   """A sink to a GCS or local files.
@@ -280,6 +282,11 @@ class FileSink(iobase.Sink):
     return type(self) == type(other) and self.__dict__ == other.__dict__
 
 
+# Using FileBasedSink for the public API to be symmetric with FileBasedSource.
+# TODO: move code from FileSink to here and delete that class.
+FileBasedSink = FileSink
+
+
 class FileSinkWriter(iobase.Writer):
   """The writer for FileSink.
   """

http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index 3d35f3e..db6a1d0 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -30,6 +30,9 @@ logger = logging.getLogger(__name__)
 
 DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
 
+__all__ = ['CompressionTypes', 'CompressedFile', 'FileMetadata', 'FileSystem',
+           'MatchResult']
+
 
 class CompressionTypes(object):
   """Enum-like class representing known compression types."""

http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/filesystems.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py
index 29f0644..e039686 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -33,6 +33,8 @@ except ImportError:
   pass
 # pylint: enable=wrong-import-position, unused-import
 
+__all__ = ['FileSystems']
+
 
 class FileSystems(object):
   """A class that defines the functions that can be performed on a filesystem.

http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index dc71fce..ce8b5e6 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -26,6 +26,8 @@ from apache_beam.io.filesystem import FileSystem
 from apache_beam.io.filesystem import MatchResult
 from apache_beam.io.gcp import gcsio
 
+__all__ = ['GCSFileSystem']
+
 
 class GCSFileSystem(FileSystem):
   """A GCS ``FileSystem`` implementation for accessing files on GCS.

http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index c76c99d..774ee54 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -34,6 +34,9 @@ import traceback
 
 from apache_beam.utils import retry
 
+__all__ = ['GcsIO']
+
+
 # Issue a friendlier error message if the storage library is not available.
 # TODO(silviuc): Remove this guard when storage is available everywhere.
 try:

http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/gcp/pubsub.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index efc628d..103fce0 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -26,6 +26,8 @@ from apache_beam import coders
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms.display import DisplayDataItem
 
+__all__ = ['PubSubSink', 'PubSubSource']
+
 
 class PubSubSource(dataflow_io.NativeSource):
   """Source for reading from a given Cloud Pub/Sub topic.

http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index f42b70f..844cbc5 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -24,6 +24,9 @@ from hamcrest.core.base_matcher import BaseMatcher
 from apache_beam.testing.test_utils import compute_hash
 from apache_beam.utils import retry
 
+__all__ = ['BigqueryMatcher']
+
+
 # Protect against environments where bigquery library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
 try:

http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index e8ffb72..a80b12f 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -46,6 +46,8 @@ from apache_beam.transforms.display import HasDisplayData
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils.windowed_value import WindowedValue
 
+__all__ = ['BoundedSource', 'RangeTracker', 'Read', 'Sink', 'Write', 'Writer']
+
 
 # Encapsulates information about a bundle of a source generated when method
 # BoundedSource.split() is invoked.

http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index c670704..b08ac49 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -29,6 +29,8 @@ from apache_beam.io.filesystem import FileMetadata
 from apache_beam.io.filesystem import FileSystem
 from apache_beam.io.filesystem import MatchResult
 
+__all__ = ['LocalFileSystem']
+
 
 class LocalFileSystem(FileSystem):
   """A Local ``FileSystem`` implementation for accessing files on disk.

http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index 000df81..9cb36e7 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -24,6 +24,9 @@ import threading
 
 from apache_beam.io import iobase
 
+__all__ = ['OffsetRangeTracker', 'LexicographicKeyRangeTracker',
+           'OrderedPositionRangeTracker', 'UnsplittableRangeTracker']
+
 
 class OffsetRangeTracker(iobase.RangeTracker):
   """A 'RangeTracker' for non-negative positions of type 'long'."""
@@ -191,7 +194,9 @@ class OffsetRangeTracker(iobase.RangeTracker):
 
 
 class GroupedShuffleRangeTracker(iobase.RangeTracker):
-  """A 'RangeTracker' for positions used by'GroupedShuffleReader'.
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A 'RangeTracker' for positions used by'GroupedShuffleReader'.
 
   These positions roughly correspond to hashes of keys. In case of hash
   collisions, multiple groups can have the same position. In that case, the

http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index edb6409..a144a8a 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -52,6 +52,14 @@ import weakref
 from multiprocessing.pool import ThreadPool
 from apache_beam.io import iobase
 
+__all__ = ['read_from_source', 'assert_sources_equal_reference_source',
+           'assert_reentrant_reads_succeed',
+           'assert_split_at_fraction_behavior',
+           'assert_split_at_fraction_binary',
+           'assert_split_at_fraction_exhaustive',
+           'assert_split_at_fraction_fails',
+           'assert_split_at_fraction_succeeds_and_consistent']
+
 
 class ExpectedSplitOutcome(object):
   MUST_SUCCEED_AND_BE_CONSISTENT = 1


[07/19] beam git commit: Add internal comments to metrics

Posted by al...@apache.org.
Add internal comments to metrics


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6543abb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6543abb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6543abb

Branch: refs/heads/release-2.0.0
Commit: a6543abb37e65fe6c7afcaf3ea670c75aee1f7d0
Parents: 6d02da0
Author: Ahmet Altay <al...@google.com>
Authored: Thu May 11 14:50:33 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/metrics/__init__.py   | 1 +
 sdks/python/apache_beam/metrics/cells.py      | 2 ++
 sdks/python/apache_beam/metrics/execution.py  | 6 ++----
 sdks/python/apache_beam/metrics/metric.py     | 4 ++++
 sdks/python/apache_beam/metrics/metricbase.py | 2 ++
 5 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a6543abb/sdks/python/apache_beam/metrics/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/__init__.py b/sdks/python/apache_beam/metrics/__init__.py
index 164d1a8..8ce7bbb 100644
--- a/sdks/python/apache_beam/metrics/__init__.py
+++ b/sdks/python/apache_beam/metrics/__init__.py
@@ -15,3 +15,4 @@
 # limitations under the License.
 #
 from apache_beam.metrics.metric import Metrics
+from apache_beam.metrics.metric import MetricsFilter

http://git-wip-us.apache.org/repos/asf/beam/blob/a6543abb/sdks/python/apache_beam/metrics/cells.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index fbe3ad3..ba840f7 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -29,6 +29,8 @@ import threading
 from apache_beam.metrics.metricbase import Counter
 from apache_beam.metrics.metricbase import Distribution
 
+__all__ = ['DistributionResult']
+
 
 class CellCommitState(object):
   """For internal use only; no backwards-compatibility guarantees.

http://git-wip-us.apache.org/repos/asf/beam/blob/a6543abb/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index a06ec0c..675e49c 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -24,7 +24,7 @@ Available classes:
 
 - MetricKey - Internal key for a metric.
 - MetricResult - Current status of a metric's updates/commits.
-- MetricsEnvironment - Keeps track of MetricsContainer and other metrics
+- _MetricsEnvironment - Keeps track of MetricsContainer and other metrics
     information for every single execution working thread.
 - MetricsContainer - Holds the metrics of a single step and a single
     unit-of-commit (bundle).
@@ -36,9 +36,7 @@ from apache_beam.metrics.cells import CounterCell, DistributionCell
 
 
 class MetricKey(object):
-  """
-
-  Key used to identify instance of metric cell.
+  """Key used to identify instance of metric cell.
 
   Metrics are internally keyed by the step name they associated with and
   the name of the metric.

http://git-wip-us.apache.org/repos/asf/beam/blob/a6543abb/sdks/python/apache_beam/metrics/metric.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
index 33db4e1..f99c0c4 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -30,6 +30,8 @@ from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.metrics.metricbase import Counter, Distribution
 from apache_beam.metrics.metricbase import MetricName
 
+__all__ = ['Metrics', 'MetricsFilter']
+
 
 class Metrics(object):
   """Lets users create/access metric objects during pipeline execution."""
@@ -146,6 +148,8 @@ class MetricResults(object):
 class MetricsFilter(object):
   """Simple object to filter metrics results.
 
+  This class is experimental. No backwards-compatibility guarantees.
+
   If filters by matching a result's step-namespace-name with three internal
   sets. No execution/matching logic is added to this object, so that it may
   be used to construct arguments as an RPC request. It is left for runners

http://git-wip-us.apache.org/repos/asf/beam/blob/a6543abb/sdks/python/apache_beam/metrics/metricbase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metricbase.py b/sdks/python/apache_beam/metrics/metricbase.py
index fa0ca75..699f29c 100644
--- a/sdks/python/apache_beam/metrics/metricbase.py
+++ b/sdks/python/apache_beam/metrics/metricbase.py
@@ -30,6 +30,8 @@ Available classes:
 - MetricName - Namespace and name used to refer to a Metric.
 """
 
+__all__ = ['Metric', 'Counter', 'Distribution', 'MetricName']
+
 
 class MetricName(object):
   """The name of a metric.


[10/19] beam git commit: [BEAM-1345] Clearly delineate public API in apache_beam/options

Posted by al...@apache.org.
[BEAM-1345] Clearly delineate public API in apache_beam/options


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6d77f958
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6d77f958
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6d77f958

Branch: refs/heads/release-2.0.0
Commit: 6d77f958de666ccf5f59e907c292efbc9272b49b
Parents: 0a0cc2d
Author: Charles Chen <cc...@google.com>
Authored: Thu May 11 13:31:18 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/options/pipeline_options.py     | 20 +++++++++++++++++---
 .../options/pipeline_options_validator.py       |  2 ++
 .../apache_beam/options/value_provider.py       |  8 ++++++++
 3 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6d77f958/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index b79d85d..983d128 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -25,6 +25,20 @@ from apache_beam.options.value_provider import RuntimeValueProvider
 from apache_beam.options.value_provider import ValueProvider
 
 
+__all__ = [
+    'PipelineOptions',
+    'StandardOptions',
+    'TypeOptions',
+    'DirectOptions',
+    'GoogleCloudOptions',
+    'WorkerOptions',
+    'DebugOptions',
+    'ProfilingOptions',
+    'SetupOptions',
+    'TestOptions',
+    ]
+
+
 def _static_value_provider_of(value_type):
   """"Helper function to plug a ValueProvider into argparse.
 
@@ -42,7 +56,7 @@ def _static_value_provider_of(value_type):
   return _f
 
 
-class BeamArgumentParser(argparse.ArgumentParser):
+class _BeamArgumentParser(argparse.ArgumentParser):
   """An ArgumentParser that supports ValueProvider options.
 
   Example Usage::
@@ -133,7 +147,7 @@ class PipelineOptions(HasDisplayData):
     """
     self._flags = flags
     self._all_options = kwargs
-    parser = BeamArgumentParser()
+    parser = _BeamArgumentParser()
 
     for cls in type(self).mro():
       if cls == PipelineOptions:
@@ -187,7 +201,7 @@ class PipelineOptions(HasDisplayData):
     # TODO(BEAM-1319): PipelineOption sub-classes in the main session might be
     # repeated. Pick last unique instance of each subclass to avoid conflicts.
     subset = {}
-    parser = BeamArgumentParser()
+    parser = _BeamArgumentParser()
     for cls in PipelineOptions.__subclasses__():
       subset[str(cls)] = cls
     for cls in subset.values():

http://git-wip-us.apache.org/repos/asf/beam/blob/6d77f958/sdks/python/apache_beam/options/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py b/sdks/python/apache_beam/options/pipeline_options_validator.py
index 5c1ce2a..24d2e55 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator.py
@@ -16,6 +16,8 @@
 #
 
 """Pipeline options validator.
+
+For internal use only; no backwards-compatibility guarantees.
 """
 import re
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6d77f958/sdks/python/apache_beam/options/value_provider.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/value_provider.py b/sdks/python/apache_beam/options/value_provider.py
index c00d7bc..40bddba 100644
--- a/sdks/python/apache_beam/options/value_provider.py
+++ b/sdks/python/apache_beam/options/value_provider.py
@@ -24,6 +24,14 @@ from functools import wraps
 from apache_beam import error
 
 
+__all__ = [
+    'ValueProvider',
+    'StaticValueProvider',
+    'RuntimeValueProvider',
+    'check_accessible',
+    ]
+
+
 class ValueProvider(object):
   def is_accessible(self):
     raise NotImplementedError(


[04/19] beam git commit: [BEAM-1345] Clearly delineate public api in runners package.

Posted by al...@apache.org.
[BEAM-1345] Clearly delineate public api in runners package.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aeeefc17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aeeefc17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aeeefc17

Branch: refs/heads/release-2.0.0
Commit: aeeefc1725bca11f765f57bf2833d606a0e6d6ba
Parents: 6d77f95
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 11 13:41:24 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/api/__init__.py             | 4 +++-
 sdks/python/apache_beam/runners/common.py                   | 5 ++++-
 sdks/python/apache_beam/runners/dataflow/__init__.py        | 9 +++++++++
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 3 +++
 .../apache_beam/runners/dataflow/test_dataflow_runner.py    | 3 +++
 sdks/python/apache_beam/runners/direct/__init__.py          | 6 +++++-
 sdks/python/apache_beam/runners/direct/direct_runner.py     | 3 +++
 sdks/python/apache_beam/runners/pipeline_context.py         | 6 ++++++
 sdks/python/apache_beam/runners/portability/__init__.py     | 2 ++
 sdks/python/apache_beam/runners/runner.py                   | 3 +++
 sdks/python/apache_beam/runners/worker/__init__.py          | 2 ++
 11 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/api/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/__init__.py b/sdks/python/apache_beam/runners/api/__init__.py
index e94673c..bf95208 100644
--- a/sdks/python/apache_beam/runners/api/__init__.py
+++ b/sdks/python/apache_beam/runners/api/__init__.py
@@ -15,7 +15,9 @@
 # limitations under the License.
 #
 
-"""Checked in to avoid protoc dependency for Python development.
+"""For internal use only; no backwards-compatibility guarantees.
+
+Checked in to avoid protoc dependency for Python development.
 
 Regenerate files with::
 

http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 86db711..8453569 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -17,7 +17,10 @@
 
 # cython: profile=True
 
-"""Worker operations executor."""
+"""Worker operations executor.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
 
 import sys
 import traceback

http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/dataflow/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/__init__.py b/sdks/python/apache_beam/runners/dataflow/__init__.py
index cce3aca..6674ba5 100644
--- a/sdks/python/apache_beam/runners/dataflow/__init__.py
+++ b/sdks/python/apache_beam/runners/dataflow/__init__.py
@@ -14,3 +14,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+"""The DataflowRunner executes pipelines on Google Cloud Dataflow.
+
+Anything in this package not imported here is an internal implementation detail
+with no backwards-compatibility guarantees.
+"""
+
+from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
+from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner

http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 796a67b..3d8437c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -46,6 +46,9 @@ from apache_beam.typehints import typehints
 from apache_beam.options.pipeline_options import StandardOptions
 
 
+__all__ = ['DataflowRunner']
+
+
 class DataflowRunner(PipelineRunner):
   """A runner that creates job graphs and submits them for remote execution.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 4fd1026..b339882 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -22,6 +22,9 @@ from apache_beam.options.pipeline_options import TestOptions, GoogleCloudOptions
 from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
 
 
+__all__ = ['TestDataflowRunner']
+
+
 class TestDataflowRunner(DataflowRunner):
   def run(self, pipeline):
     """Execute test pipeline and verify test matcher"""

http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/direct/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/__init__.py b/sdks/python/apache_beam/runners/direct/__init__.py
index 0d64513..0f82756 100644
--- a/sdks/python/apache_beam/runners/direct/__init__.py
+++ b/sdks/python/apache_beam/runners/direct/__init__.py
@@ -15,5 +15,9 @@
 # limitations under the License.
 #
 
-"""Inprocess runner executes pipelines locally in a single process."""
+"""Inprocess runner executes pipelines locally in a single process.
+
+Anything in this package not imported here is an internal implementation detail
+with no backwards-compatibility guarantees.
+"""
 from apache_beam.runners.direct.direct_runner import DirectRunner

http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 535aac3..ecf5114 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -36,6 +36,9 @@ from apache_beam.options.pipeline_options import DirectOptions
 from apache_beam.options.value_provider import RuntimeValueProvider
 
 
+__all__ = ['DirectRunner']
+
+
 class DirectRunner(PipelineRunner):
   """Executes a single pipeline on the local machine."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index d3d3c24..1c89d06 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -15,6 +15,12 @@
 # limitations under the License.
 #
 
+"""Utility class for serializing pipelines via the runner API.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
+
+
 from apache_beam import pipeline
 from apache_beam import pvalue
 from apache_beam import coders

http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/portability/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/__init__.py b/sdks/python/apache_beam/runners/portability/__init__.py
index cce3aca..7af93ed 100644
--- a/sdks/python/apache_beam/runners/portability/__init__.py
+++ b/sdks/python/apache_beam/runners/portability/__init__.py
@@ -14,3 +14,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+"""This runner is experimental; no backwards-compatibility guarantees."""

http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index d875fdc..af00d8f 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -26,6 +26,9 @@ import shutil
 import tempfile
 
 
+__all__ = ['PipelineRunner', 'PipelineState', 'PipelineResult']
+
+
 def _get_runner_map(runner_names, module_path):
   """Create a map of runner name in lower case to full import path to the
   runner class.

http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/worker/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/__init__.py b/sdks/python/apache_beam/runners/worker/__init__.py
index cce3aca..0bce5d6 100644
--- a/sdks/python/apache_beam/runners/worker/__init__.py
+++ b/sdks/python/apache_beam/runners/worker/__init__.py
@@ -14,3 +14,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+"""For internal use only; no backwards-compatibility guarantees."""


[15/19] beam git commit: [BEAM-1345] Annotate public members of pvalue.

Posted by al...@apache.org.
[BEAM-1345] Annotate public members of pvalue.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0da682d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0da682d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0da682d

Branch: refs/heads/release-2.0.0
Commit: d0da682d5e981217c90571febdff728b8abbbc14
Parents: 2bb95fd
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 11 12:07:00 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:37 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pvalue.py | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d0da682d/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index fa91fe3..7385e82 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -31,6 +31,17 @@ import itertools
 from apache_beam import typehints
 
 
+__all__ = [
+    'PCollection',
+    'TaggedOutput',
+    'AsSingleton',
+    'AsIter',
+    'AsList',
+    'AsDict',
+    'EmptySideInput',
+]
+
+
 class PValue(object):
   """Base class for PCollection.
 


[11/19] beam git commit: [BEAM-1345] Mark windowed value as experimental

Posted by al...@apache.org.
[BEAM-1345] Mark windowed value as experimental


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5b095118
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5b095118
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5b095118

Branch: refs/heads/release-2.0.0
Commit: 5b09511873136cc6aff8c75f3767c2d81e288940
Parents: 9730f56
Author: Sourabh Bajaj <so...@google.com>
Authored: Thu May 11 11:58:08 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/utils/windowed_value.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5b095118/sdks/python/apache_beam/utils/windowed_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py
index 87c26d1..be27854 100644
--- a/sdks/python/apache_beam/utils/windowed_value.py
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -16,10 +16,12 @@
 #
 
 """Core windowing data structures.
+
+This module is experimental. No backwards-compatibility guarantees.
 """
 
 # This module is carefully crafted to have optimal performance when
-# compiled whiel still being valid Python.  Care needs to be taken when
+# compiled while still being valid Python.  Care needs to be taken when
 # editing this file as WindowedValues are created for every element for
 # every step in a Beam pipeline.
 


[06/19] beam git commit: [BEAM-1345] Clearly delineate public api in apache_beam/coders.

Posted by al...@apache.org.
[BEAM-1345] Clearly delineate public api in apache_beam/coders.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7fd012b6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7fd012b6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7fd012b6

Branch: refs/heads/release-2.0.0
Commit: 7fd012b63f7ff8b10d80bce53a860ad7102673d6
Parents: 0ce2543
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed May 10 17:13:06 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py    |  2 ++
 sdks/python/apache_beam/coders/coders.py        | 32 +++++++++++++++++---
 sdks/python/apache_beam/coders/coders_test.py   | 11 ++++---
 .../apache_beam/coders/coders_test_common.py    |  2 +-
 sdks/python/apache_beam/coders/observable.py    |  5 ++-
 sdks/python/apache_beam/coders/slow_stream.py   |  5 ++-
 .../apache_beam/coders/standard_coders_test.py  |  2 +-
 sdks/python/apache_beam/coders/stream.pyx       |  5 +++
 sdks/python/apache_beam/coders/typecoders.py    |  3 ++
 .../examples/snippets/snippets_test.py          |  3 +-
 sdks/python/apache_beam/io/fileio_test.py       |  2 +-
 sdks/python/apache_beam/io/textio.py            |  2 +-
 sdks/python/apache_beam/transforms/window.py    |  2 +-
 13 files changed, 59 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index a0496a2..10298bf 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -23,6 +23,8 @@ encode many elements with minimal overhead.
 
 This module may be optionally compiled with Cython, using the corresponding
 coder_impl.pxd file for type hints.
+
+For internal use only; no backwards-compatibility guarantees.
 """
 from types import NoneType
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 4f75182..ce914dd 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -15,7 +15,10 @@
 # limitations under the License.
 #
 
-"""Collection of useful coders."""
+"""Collection of useful coders.
+
+Only those coders listed in __all__ are part of the public API of this module.
+"""
 
 import base64
 import cPickle as pickle
@@ -45,6 +48,13 @@ except ImportError:
   import dill
 
 
+__all__ = ['Coder',
+           'BytesCoder', 'DillCoder', 'FastPrimitivesCoder', 'FloatCoder',
+           'IterableCoder', 'PickleCoder', 'ProtoCoder', 'SingletonCoder',
+           'StrUtf8Coder', 'TimestampCoder', 'TupleCoder',
+           'TupleSequenceCoder', 'VarIntCoder', 'WindowedValueCoder']
+
+
 def serialize_coder(coder):
   from apache_beam.internal import pickler
   return '%s$%s' % (coder.__class__.__name__, pickler.dumps(coder))
@@ -116,6 +126,10 @@ class Coder(object):
                                         self.estimate_size)
 
   def get_impl(self):
+    """For internal use only; no backwards-compatibility guarantees.
+
+    Returns the CoderImpl backing this Coder.
+    """
     if not hasattr(self, '_impl'):
       self._impl = self._create_impl()
       assert isinstance(self._impl, coder_impl.CoderImpl)
@@ -152,13 +166,17 @@ class Coder(object):
       raise ValueError('Not a KV coder: %s.' % self)
 
   def _get_component_coders(self):
-    """Returns the internal component coders of this coder."""
+    """For internal use only; no backwards-compatibility guarantees.
+
+    Returns the internal component coders of this coder."""
     # This is an internal detail of the Coder API and does not need to be
     # refined in user-defined Coders.
     return []
 
   def as_cloud_object(self):
-    """Returns Google Cloud Dataflow API description of this coder."""
+    """For internal use only; no backwards-compatibility guarantees.
+
+    Returns Google Cloud Dataflow API description of this coder."""
     # This is an internal detail of the Coder API and does not need to be
     # refined in user-defined Coders.
 
@@ -184,6 +202,8 @@ class Coder(object):
     # pylint: enable=protected-access
 
   def to_runner_api(self, context):
+    """For internal use only; no backwards-compatibility guarantees.
+    """
     # TODO(BEAM-115): Use specialized URNs and components.
     from apache_beam.runners.api import beam_runner_api_pb2
     return beam_runner_api_pb2.Coder(
@@ -196,6 +216,8 @@ class Coder(object):
 
   @staticmethod
   def from_runner_api(proto, context):
+    """For internal use only; no backwards-compatibility guarantees.
+    """
     any_proto = proto.spec.spec.parameter
     bytes_proto = google.protobuf.wrappers_pb2.BytesValue()
     any_proto.Unpack(bytes_proto)
@@ -779,7 +801,9 @@ class WindowedValueCoder(FastCoder):
 
 
 class LengthPrefixCoder(FastCoder):
-  """Coder which prefixes the length of the encoded object in the stream."""
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Coder which prefixes the length of the encoded object in the stream."""
 
   def __init__(self, value_coder):
     self._value_coder = value_coder

http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py
index 575503b..c89e810 100644
--- a/sdks/python/apache_beam/coders/coders_test.py
+++ b/sdks/python/apache_beam/coders/coders_test.py
@@ -20,8 +20,9 @@ import base64
 import logging
 import unittest
 
-from apache_beam import coders
+from apache_beam.coders import coders
 from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
+from apache_beam.coders.typecoders import registry as coders_registry
 
 
 class PickleCoderTest(unittest.TestCase):
@@ -46,13 +47,13 @@ class PickleCoderTest(unittest.TestCase):
 class CodersTest(unittest.TestCase):
 
   def test_str_utf8_coder(self):
-    real_coder = coders.registry.get_coder(str)
+    real_coder = coders_registry.get_coder(str)
     expected_coder = coders.BytesCoder()
     self.assertEqual(
         real_coder.encode('abc'), expected_coder.encode('abc'))
     self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
 
-    real_coder = coders.registry.get_coder(bytes)
+    real_coder = coders_registry.get_coder(bytes)
     expected_coder = coders.BytesCoder()
     self.assertEqual(
         real_coder.encode('abc'), expected_coder.encode('abc'))
@@ -82,7 +83,7 @@ class ProtoCoderTest(unittest.TestCase):
     mb.field1 = True
     ma.field1 = u'hello world'
     expected_coder = coders.ProtoCoder(ma.__class__)
-    real_coder = coders.registry.get_coder(ma.__class__)
+    real_coder = coders_registry.get_coder(ma.__class__)
     self.assertEqual(expected_coder, real_coder)
     self.assertEqual(real_coder.encode(ma), expected_coder.encode(ma))
     self.assertEqual(ma, real_coder.decode(real_coder.encode(ma)))
@@ -104,7 +105,7 @@ class FallbackCoderTest(unittest.TestCase):
   def test_default_fallback_path(self):
     """Test fallback path picks a matching coder if no coder is registered."""
 
-    coder = coders.registry.get_coder(DummyClass)
+    coder = coders_registry.get_coder(DummyClass)
     # No matching coder, so picks the last fallback coder which is a
     # FastPrimitivesCoder.
     self.assertEqual(coder, coders.FastPrimitivesCoder())

http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index e5bfe35..c9b67b3 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -23,12 +23,12 @@ import unittest
 
 import dill
 
-import coders
 import observable
 from apache_beam.transforms import window
 from apache_beam.utils import timestamp
 from apache_beam.utils import windowed_value
 
+from apache_beam.coders import coders
 from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/observable.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/observable.py b/sdks/python/apache_beam/coders/observable.py
index 5a808d8..fc952cf 100644
--- a/sdks/python/apache_beam/coders/observable.py
+++ b/sdks/python/apache_beam/coders/observable.py
@@ -16,7 +16,10 @@
 #
 
 
-"""Observable base class for iterables."""
+"""Observable base class for iterables.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
 
 
 class ObservableMixin(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/slow_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py
index 85837bc..1ab55d9 100644
--- a/sdks/python/apache_beam/coders/slow_stream.py
+++ b/sdks/python/apache_beam/coders/slow_stream.py
@@ -15,7 +15,10 @@
 # limitations under the License.
 #
 
-"""A pure Python implementation of stream.pyx."""
+"""A pure Python implementation of stream.pyx.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
 
 import struct
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/standard_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 885e88f..5f98455 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -26,7 +26,7 @@ import unittest
 
 import yaml
 
-from apache_beam import coders
+from apache_beam.coders import coders
 from apache_beam.coders import coder_impl
 from apache_beam.utils import windowed_value
 from apache_beam.utils.timestamp import Timestamp

http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/stream.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx
index ae24418..8d97681 100644
--- a/sdks/python/apache_beam/coders/stream.pyx
+++ b/sdks/python/apache_beam/coders/stream.pyx
@@ -15,6 +15,11 @@
 # limitations under the License.
 #
 
+"""Compiled version of the Stream objects used by CoderImpl.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
+
 cimport libc.stdlib
 cimport libc.string
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/typecoders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py
index 60832c9..3894bb5 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -71,6 +71,9 @@ from apache_beam.coders import coders
 from apache_beam.typehints import typehints
 
 
+__all__ = ['registry']
+
+
 class CoderRegistry(object):
   """A coder registry for typehint/coder associations."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 85d8bde..211da24 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -29,6 +29,7 @@ import apache_beam as beam
 from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam import typehints
+from apache_beam.coders.coders import ToStringCoder
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 from apache_beam.options.pipeline_options import PipelineOptions
@@ -422,7 +423,7 @@ class SnippetsTest(unittest.TestCase):
       def __init__(self, file_to_write):
         self.file_to_write = file_to_write
         self.file_obj = None
-        self.coder = coders.ToStringCoder()
+        self.coder = ToStringCoder()
 
       def start_bundle(self):
         assert self.file_to_write

http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 4c25505..b92b8be 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -29,7 +29,7 @@ import hamcrest as hc
 import mock
 
 import apache_beam as beam
-from apache_beam import coders
+from apache_beam.coders import coders
 from apache_beam.io import fileio
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData

http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 750ec45..d43f4fc 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -21,7 +21,7 @@
 from __future__ import absolute_import
 import logging
 
-from apache_beam import coders
+from apache_beam.coders import coders
 from apache_beam.io import filebasedsource
 from apache_beam.io import fileio
 from apache_beam.io import iobase

http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 29994c0..6d0db3a 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -53,7 +53,7 @@ import abc
 
 from google.protobuf import struct_pb2
 
-from apache_beam import coders
+from apache_beam.coders import coders
 from apache_beam.runners.api import beam_runner_api_pb2
 from apache_beam.transforms import timeutil
 from apache_beam.utils import proto_utils