You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/12 00:11:55 UTC
[01/19] beam git commit: [BEAM-1345] Clearly delineate public api in
apache_beam/typehints.
Repository: beam
Updated Branches:
refs/heads/release-2.0.0 65aa0ffd3 -> 1cc32c65e
[BEAM-1345] Clearly delineate public api in apache_beam/typehints.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/36fcd36c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/36fcd36c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/36fcd36c
Branch: refs/heads/release-2.0.0
Commit: 36fcd36cb779592bc3964d4949a70df5a1cc1428
Parents: a6543ab
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 11 12:54:13 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 8 ++++----
sdks/python/apache_beam/transforms/ptransform.py | 14 +++++++-------
sdks/python/apache_beam/typehints/decorators.py | 17 +++++++++++++----
sdks/python/apache_beam/typehints/opcodes.py | 2 ++
.../apache_beam/typehints/trivial_inference.py | 2 ++
sdks/python/apache_beam/typehints/typecheck.py | 15 +++++++++------
sdks/python/apache_beam/typehints/typehints.py | 18 ++++++++++++++++++
.../apache_beam/typehints/typehints_test.py | 19 ++++++++++---------
9 files changed, 66 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index ec8dde4..79480d7 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -53,11 +53,11 @@ import shutil
import tempfile
from apache_beam import pvalue
-from apache_beam import typehints
from apache_beam.internal import pickler
from apache_beam.runners import create_runner
from apache_beam.runners import PipelineRunner
from apache_beam.transforms import ptransform
+from apache_beam.typehints import typehints
from apache_beam.typehints import TypeCheckError
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index a1964cf..abe699f 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -40,15 +40,15 @@ from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import WindowFn
from apache_beam.typehints import Any
-from apache_beam.typehints import get_type_hints
-from apache_beam.typehints import is_consistent_with
from apache_beam.typehints import Iterable
from apache_beam.typehints import KV
from apache_beam.typehints import trivial_inference
-from apache_beam.typehints import TypeCheckError
from apache_beam.typehints import Union
-from apache_beam.typehints import WithTypeHints
+from apache_beam.typehints.decorators import get_type_hints
+from apache_beam.typehints.decorators import TypeCheckError
+from apache_beam.typehints.decorators import WithTypeHints
from apache_beam.typehints.trivial_inference import element_type
+from apache_beam.typehints.typehints import is_consistent_with
from apache_beam.utils import urns
from apache_beam.options.pipeline_options import TypeOptions
http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index d1f9835..8898c36 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -46,16 +46,16 @@ from google.protobuf import wrappers_pb2
from apache_beam import error
from apache_beam import pvalue
-from apache_beam import typehints
from apache_beam.internal import pickler
from apache_beam.internal import util
from apache_beam.transforms.display import HasDisplayData
from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.typehints import getcallargs_forhints
-from apache_beam.typehints import TypeCheckError
-from apache_beam.typehints import validate_composite_type_param
-from apache_beam.typehints import WithTypeHints
+from apache_beam.typehints import typehints
+from apache_beam.typehints.decorators import getcallargs_forhints
+from apache_beam.typehints.decorators import TypeCheckError
+from apache_beam.typehints.decorators import WithTypeHints
from apache_beam.typehints.trivial_inference import instance_to_type
+from apache_beam.typehints.typehints import validate_composite_type_param
from apache_beam.utils import proto_utils
from apache_beam.utils import urns
@@ -491,7 +491,7 @@ class PTransformWithSideInputs(PTransform):
"""
def __init__(self, fn, *args, **kwargs):
- if isinstance(fn, type) and issubclass(fn, typehints.WithTypeHints):
+ if isinstance(fn, type) and issubclass(fn, WithTypeHints):
# Don't treat Fn class objects as callables.
raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__))
self.fn = self.make_fn(fn)
@@ -577,7 +577,7 @@ class PTransformWithSideInputs(PTransform):
continue
if not typehints.is_consistent_with(
bindings.get(arg, typehints.Any), hint):
- raise typehints.TypeCheckError(
+ raise TypeCheckError(
'Type hint violation for \'%s\': requires %s but got %s for %s'
% (self.label, hint, bindings[arg], arg))
http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/typehints/decorators.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py
index 4eabdba..6ed388a 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -86,11 +86,20 @@ defined, or before importing a module containing type-hinted functions.
import inspect
import types
-from apache_beam.typehints import check_constraint
-from apache_beam.typehints import CompositeTypeHintError
-from apache_beam.typehints import SimpleTypeHintError
from apache_beam.typehints import typehints
-from apache_beam.typehints import validate_composite_type_param
+from apache_beam.typehints.typehints import check_constraint
+from apache_beam.typehints.typehints import CompositeTypeHintError
+from apache_beam.typehints.typehints import SimpleTypeHintError
+from apache_beam.typehints.typehints import validate_composite_type_param
+
+
+__all__ = [
+ 'with_input_types',
+ 'with_output_types',
+ 'WithTypeHints',
+ 'TypeCheckError',
+]
+
# This is missing in the builtin types module. str.upper is arbitrary, any
# method on a C-implemented type will do.
http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/typehints/opcodes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/opcodes.py b/sdks/python/apache_beam/typehints/opcodes.py
index 042acc0..83f444c 100644
--- a/sdks/python/apache_beam/typehints/opcodes.py
+++ b/sdks/python/apache_beam/typehints/opcodes.py
@@ -23,6 +23,8 @@ FrameState object, the second the integer opcode argument.
Bytecodes with more complicated behavior (e.g. modifying the program counter)
are handled inline rather than here.
+
+For internal use only; no backwards-compatibility guarantees.
"""
import types
http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/typehints/trivial_inference.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py b/sdks/python/apache_beam/typehints/trivial_inference.py
index 4581aa1..977ea06 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference.py
@@ -16,6 +16,8 @@
#
"""Trivial type inference for simple functions.
+
+For internal use only; no backwards-compatibility guarantees.
"""
import __builtin__
import collections
http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py
index 160d104..09b73f9 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -15,7 +15,10 @@
# limitations under the License.
#
-"""Runtime type checking support."""
+"""Runtime type checking support.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
import collections
import inspect
@@ -25,13 +28,13 @@ import types
from apache_beam.pvalue import TaggedOutput
from apache_beam.transforms.core import DoFn
from apache_beam.transforms.window import WindowedValue
-from apache_beam.typehints import check_constraint
-from apache_beam.typehints import CompositeTypeHintError
-from apache_beam.typehints import GeneratorWrapper
-from apache_beam.typehints import SimpleTypeHintError
-from apache_beam.typehints import TypeCheckError
from apache_beam.typehints.decorators import _check_instance_type
from apache_beam.typehints.decorators import getcallargs_forhints
+from apache_beam.typehints.decorators import GeneratorWrapper
+from apache_beam.typehints.decorators import TypeCheckError
+from apache_beam.typehints.typehints import check_constraint
+from apache_beam.typehints.typehints import CompositeTypeHintError
+from apache_beam.typehints.typehints import SimpleTypeHintError
class AbstractDoFnWrapper(DoFn):
http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index 9b41adb..cc430be 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -68,6 +68,24 @@ import copy
import types
+__all__ = [
+ 'Any',
+ 'Union',
+ 'Optional',
+ 'Tuple',
+ 'Tuple',
+ 'List',
+ 'KV',
+ 'Dict',
+ 'Set',
+ 'Iterable',
+ 'Iterator',
+ 'Generator',
+ 'WindowedValue',
+ 'TypeVariable',
+]
+
+
# A set of the built-in Python types we don't support, guiding the users
# to templated (upper-case) versions instead.
DISALLOWED_PRIMITIVE_TYPES = (list, set, tuple, dict)
http://git-wip-us.apache.org/repos/asf/beam/blob/36fcd36c/sdks/python/apache_beam/typehints/typehints_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py
index f90b5e9..f1b92e0 100644
--- a/sdks/python/apache_beam/typehints/typehints_test.py
+++ b/sdks/python/apache_beam/typehints/typehints_test.py
@@ -21,9 +21,8 @@ import inspect
import unittest
-import apache_beam.typehints as typehints
+import apache_beam.typehints.typehints as typehints
from apache_beam.typehints import Any
-from apache_beam.typehints import is_consistent_with
from apache_beam.typehints import Tuple
from apache_beam.typehints import TypeCheckError
from apache_beam.typehints import Union
@@ -34,6 +33,8 @@ from apache_beam.typehints.decorators import _interleave_type_check
from apache_beam.typehints.decorators import _positional_arg_hints
from apache_beam.typehints.decorators import get_type_hints
from apache_beam.typehints.decorators import getcallargs_forhints
+from apache_beam.typehints.decorators import GeneratorWrapper
+from apache_beam.typehints.typehints import is_consistent_with
def check_or_interleave(hint, value, var):
@@ -712,7 +713,7 @@ class TestGeneratorWrapper(TypeHintTestCase):
l = []
interleave_func = lambda x: l.append(x)
- wrapped_gen = typehints.GeneratorWrapper(count(4), interleave_func)
+ wrapped_gen = GeneratorWrapper(count(4), interleave_func)
# Should function as a normal generator.
self.assertEqual(0, next(wrapped_gen))
@@ -1032,12 +1033,12 @@ class CombinedReturnsAndTakesTestCase(TypeHintTestCase):
class DecoratorHelpers(TypeHintTestCase):
def test_hint_helper(self):
- self.assertTrue(typehints.is_consistent_with(Any, int))
- self.assertTrue(typehints.is_consistent_with(int, Any))
- self.assertTrue(typehints.is_consistent_with(str, object))
- self.assertFalse(typehints.is_consistent_with(object, str))
- self.assertTrue(typehints.is_consistent_with(str, Union[str, int]))
- self.assertFalse(typehints.is_consistent_with(Union[str, int], str))
+ self.assertTrue(is_consistent_with(Any, int))
+ self.assertTrue(is_consistent_with(int, Any))
+ self.assertTrue(is_consistent_with(str, object))
+ self.assertFalse(is_consistent_with(object, str))
+ self.assertTrue(is_consistent_with(str, Union[str, int]))
+ self.assertFalse(is_consistent_with(Union[str, int], str))
def test_positional_arg_hints(self):
self.assertEquals(typehints.Any, _positional_arg_hints('x', {}))
[12/19] beam git commit: Mark internal modules in python datastoreio
Posted by al...@apache.org.
Mark internal modules in python datastoreio
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a0cc2d6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a0cc2d6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a0cc2d6
Branch: refs/heads/release-2.0.0
Commit: 0a0cc2d6bd5c3c0e6ab9d1388d5d3c8dc5ca7760
Parents: 5b09511
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Thu May 11 11:33:24 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py | 7 +++++--
sdks/python/apache_beam/io/gcp/datastore/v1/helper.py | 5 ++++-
2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0a0cc2d6/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py b/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
index bc4d07f..0caf6d6 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
@@ -15,7 +15,11 @@
# limitations under the License.
#
-"""Fake datastore used for unit testing."""
+"""Fake datastore used for unit testing.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
+
import uuid
# Protect against environments where datastore library is not available.
@@ -27,7 +31,6 @@ except ImportError:
pass
# pylint: enable=wrong-import-order, wrong-import-position
-
def create_run_query(entities, batch_size):
"""A fake datastore run_query method that returns entities in batches.
http://git-wip-us.apache.org/repos/asf/beam/blob/0a0cc2d6/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index a61884f..9e2c053 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -15,7 +15,10 @@
# limitations under the License.
#
-"""Cloud Datastore helper functions."""
+"""Cloud Datastore helper functions.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
import sys
# Protect against environments where datastore library is not available.
[08/19] beam git commit: [BEAM-1340] Add __all__ tags to modules in
package apache_beam/transforms
Posted by al...@apache.org.
[BEAM-1340] Add __all__ tags to modules in package apache_beam/transforms
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6d02da03
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6d02da03
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6d02da03
Branch: refs/heads/release-2.0.0
Commit: 6d02da03981f08037538e63e9246efef63a36ea0
Parents: 0c784f9
Author: Charles Chen <cc...@google.com>
Authored: Wed May 10 23:06:36 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700
----------------------------------------------------------------------
.../examples/snippets/snippets_test.py | 2 +-
.../runners/dataflow/dataflow_runner.py | 2 +-
.../runners/dataflow/dataflow_runner_test.py | 5 +++--
sdks/python/apache_beam/transforms/core.py | 20 ++++++++++++++++++++
.../apache_beam/transforms/cy_combiners.py | 5 ++++-
.../python/apache_beam/transforms/ptransform.py | 9 +++++++++
.../apache_beam/transforms/ptransform_test.py | 11 ++++++-----
.../python/apache_beam/transforms/sideinputs.py | 2 ++
sdks/python/apache_beam/transforms/timeutil.py | 5 +++++
sdks/python/apache_beam/transforms/trigger.py | 14 ++++++++++++++
sdks/python/apache_beam/transforms/window.py | 15 +++++++++++++++
11 files changed, 80 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 211da24..37cd470 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -892,7 +892,7 @@ class CombineTest(unittest.TestCase):
unix_timestamp = extract_timestamp_from_log_entry(element)
# Wrap and emit the current entry and new timestamp in a
# TimestampedValue.
- yield beam.TimestampedValue(element, unix_timestamp)
+ yield beam.window.TimestampedValue(element, unix_timestamp)
timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
# [END setting_timestamp]
http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 3d8437c..da8de9d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -169,7 +169,7 @@ class DataflowRunner(PipelineRunner):
def visit_transform(self, transform_node):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam import GroupByKey, GroupByKeyOnly
+ from apache_beam.transforms.core import GroupByKey, GroupByKeyOnly
if isinstance(transform_node.transform, (GroupByKey, GroupByKeyOnly)):
pcoll = transform_node.inputs[0]
input_type = pcoll.element_type
http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index b61a683..ac9b028 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -37,6 +37,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeExceptio
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.transforms.core import GroupByKeyOnly
from apache_beam.typehints import typehints
# Protect against environments where apitools library is not available.
@@ -184,7 +185,7 @@ class DataflowRunnerTest(unittest.TestCase):
pcoll1 = PCollection(p)
pcoll2 = PCollection(p)
pcoll3 = PCollection(p)
- for transform in [beam.GroupByKeyOnly(), beam.GroupByKey()]:
+ for transform in [GroupByKeyOnly(), beam.GroupByKey()]:
pcoll1.element_type = None
pcoll2.element_type = typehints.Any
pcoll3.element_type = typehints.KV[typehints.Any, typehints.Any]
@@ -198,7 +199,7 @@ class DataflowRunnerTest(unittest.TestCase):
p = TestPipeline()
pcoll1 = PCollection(p)
pcoll2 = PCollection(p)
- for transform in [beam.GroupByKeyOnly(), beam.GroupByKey()]:
+ for transform in [GroupByKeyOnly(), beam.GroupByKey()]:
pcoll1.element_type = typehints.TupleSequenceConstraint
pcoll2.element_type = typehints.Set
err_msg = "Input to GroupByKey must be of Tuple or Any type"
http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index d42115c..a1964cf 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -53,6 +53,26 @@ from apache_beam.utils import urns
from apache_beam.options.pipeline_options import TypeOptions
+__all__ = [
+ 'DoFn',
+ 'CombineFn',
+ 'PartitionFn',
+ 'ParDo',
+ 'FlatMap',
+ 'Map',
+ 'Filter',
+ 'CombineGlobally',
+ 'CombinePerKey',
+ 'CombineValues',
+ 'GroupByKey',
+ 'Partition',
+ 'Windowing',
+ 'WindowInto',
+ 'Flatten',
+ 'Create',
+ ]
+
+
# Type variables
T = typehints.TypeVariable('T')
K = typehints.TypeVariable('K')
http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/cy_combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py b/sdks/python/apache_beam/transforms/cy_combiners.py
index f824870..84aee21 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners.py
+++ b/sdks/python/apache_beam/transforms/cy_combiners.py
@@ -15,7 +15,10 @@
# limitations under the License.
#
-"""A library of basic cythonized CombineFn subclasses."""
+"""A library of basic cythonized CombineFn subclasses.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
from __future__ import absolute_import
http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index fb79b19..d1f9835 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -60,6 +60,13 @@ from apache_beam.utils import proto_utils
from apache_beam.utils import urns
+__all__ = [
+ 'PTransform',
+ 'ptransform_fn',
+ 'label_from_callable',
+ ]
+
+
class _PValueishTransform(object):
"""Visitor for PValueish objects.
@@ -639,6 +646,8 @@ class CallablePTransform(PTransform):
def ptransform_fn(fn):
"""A decorator for a function-based PTransform.
+ Experimental; no backwards-compatibility guarantees.
+
Args:
fn: A function implementing a custom PTransform.
http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 137992d..3320d79 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -34,6 +34,7 @@ from apache_beam.options.pipeline_options import TypeOptions
import apache_beam.pvalue as pvalue
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms import window
+from apache_beam.transforms.core import GroupByKeyOnly
import apache_beam.transforms.combiners as combine
from apache_beam.transforms.display import DisplayData, DisplayDataItem
from apache_beam.transforms.ptransform import PTransform
@@ -579,7 +580,7 @@ class PTransformTest(unittest.TestCase):
pipeline = TestPipeline()
pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
with self.assertRaises(typehints.TypeCheckError) as cm:
- pcolls | 'D' >> beam.GroupByKeyOnly()
+ pcolls | 'D' >> GroupByKeyOnly()
pipeline.run()
expected_error_prefix = ('Input type hint violation at D: expected '
@@ -1087,7 +1088,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
| ('Pair' >> beam.Map(lambda x: (x, ord(x)))
.with_output_types(typehints.KV[str, str]))
- | beam.GroupByKeyOnly())
+ | GroupByKeyOnly())
# Output type should correctly be deduced.
# GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -1111,7 +1112,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create([1, 2, 3]).with_output_types(int)
- | 'F' >> beam.GroupByKeyOnly())
+ | 'F' >> GroupByKeyOnly())
self.assertEqual("Input type hint violation at F: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1154,7 +1155,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
(self.p
| 'Nums' >> beam.Create(range(5)).with_output_types(int)
| 'ModDup' >> beam.Map(lambda x: (x % 2, x))
- | beam.GroupByKeyOnly())
+ | GroupByKeyOnly())
self.assertEqual('Pipeline type checking is enabled, however no output '
'type-hint was found for the PTransform '
@@ -1977,7 +1978,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_gbk_type_inference(self):
self.assertEqual(
typehints.Tuple[str, typehints.Iterable[int]],
- beam.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
+ GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
def test_pipeline_inference(self):
created = self.p | beam.Create(['a', 'b', 'c'])
http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py
index 6ba5311..f10cb92 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -17,6 +17,8 @@
"""Internal side input transforms and implementations.
+For internal use only; no backwards-compatibility guarantees.
+
Important: this module is an implementation detail and should not be used
directly by pipeline writers. Instead, users should use the helper methods
AsSingleton, AsIter, AsList and AsDict in apache_beam.pvalue.
http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/timeutil.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index ba4ef36..c0f9198 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -23,6 +23,11 @@ from abc import ABCMeta
from abc import abstractmethod
+__all__ = [
+ 'TimeDomain',
+ ]
+
+
class TimeDomain(object):
"""Time domain for streaming timers."""
http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 2cb7ce3..7de2f85 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -38,6 +38,20 @@ from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
+__all__ = [
+ 'AccumulationMode',
+ 'TriggerFn',
+ 'DefaultTrigger',
+ 'AfterWatermark',
+ 'AfterCount',
+ 'Repeatedly',
+ 'AfterAny',
+ 'AfterAll',
+ 'AfterEach',
+ 'OrFinally',
+ ]
+
+
class AccumulationMode(object):
"""Controls what to do with data when a trigger fires multiple times.
"""
http://git-wip-us.apache.org/repos/asf/beam/blob/6d02da03/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 6d0db3a..94187e0 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -65,6 +65,21 @@ from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.windowed_value import WindowedValue
+__all__ = [
+ 'TimestampCombiner',
+ 'WindowFn',
+ 'BoundedWindow',
+ 'IntervalWindow',
+ 'TimestampedValue',
+ 'GlobalWindow',
+ 'NonMergingWindowFn',
+ 'GlobalWindows',
+ 'FixedWindows',
+ 'SlidingWindows',
+ 'Sessions',
+ ]
+
+
# TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their
# behavior.
class TimestampCombiner(object):
[14/19] beam git commit: [BEAM-1345] Mark apache_beam/internal as
internal.
Posted by al...@apache.org.
[BEAM-1345] Mark apache_beam/internal as internal.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2bb95fd6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2bb95fd6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2bb95fd6
Branch: refs/heads/release-2.0.0
Commit: 2bb95fd68635ea1932f3a7d7cc59fcef1644deea
Parents: 36fcd36
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 11 12:09:28 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:37 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/__init__.py | 2 ++
sdks/python/apache_beam/internal/gcp/__init__.py | 2 ++
2 files changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2bb95fd6/sdks/python/apache_beam/internal/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/__init__.py b/sdks/python/apache_beam/internal/__init__.py
index cce3aca..0bce5d6 100644
--- a/sdks/python/apache_beam/internal/__init__.py
+++ b/sdks/python/apache_beam/internal/__init__.py
@@ -14,3 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
+"""For internal use only; no backwards-compatibility guarantees."""
http://git-wip-us.apache.org/repos/asf/beam/blob/2bb95fd6/sdks/python/apache_beam/internal/gcp/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/__init__.py b/sdks/python/apache_beam/internal/gcp/__init__.py
index cce3aca..0bce5d6 100644
--- a/sdks/python/apache_beam/internal/gcp/__init__.py
+++ b/sdks/python/apache_beam/internal/gcp/__init__.py
@@ -14,3 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
+"""For internal use only; no backwards-compatibility guarantees."""
[16/19] beam git commit: Fix due to GBKO name change.
Posted by al...@apache.org.
Fix due to GBKO name change.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6f5888e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6f5888e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6f5888e
Branch: refs/heads/release-2.0.0
Commit: a6f5888e42ef34eacfe6384c0de9df9371ae968c
Parents: e12cf0d
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 11 15:47:54 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:37 2017 -0700
----------------------------------------------------------------------
.../apache_beam/runners/portability/maptask_executor_runner.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a6f5888e/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index 077871e..ddfc4cc 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -243,7 +243,7 @@ class MapTaskExecutorRunner(PipelineRunner):
(label, write_sideinput_op))
return output_buffer
- def run_GroupByKeyOnly(self, transform_node):
+ def run__GroupByKeyOnly(self, transform_node):
map_task_index, producer_index, output_index = self.outputs[
transform_node.inputs[0]]
grouped_element_coder = self._get_coder(transform_node.outputs[None],
[03/19] beam git commit: [BEAM-1345] Mark Pipeline as public.
Posted by al...@apache.org.
[BEAM-1345] Mark Pipeline as public.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ce25430
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ce25430
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ce25430
Branch: refs/heads/release-2.0.0
Commit: 0ce25430ffec49fb2d94271e4af6225cee20388c
Parents: aeeefc1
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 11 13:30:32 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0ce25430/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 83c7287..ec8dde4 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -67,6 +67,9 @@ from apache_beam.options.pipeline_options_validator import PipelineOptionsValida
from apache_beam.utils.annotations import deprecated
+__all__ = ['Pipeline']
+
+
class Pipeline(object):
"""A pipeline object that manages a DAG of PValues and their PTransforms.
@@ -182,6 +185,8 @@ class Pipeline(object):
def visit(self, visitor):
"""Visits depth-first every node of a pipeline's DAG.
+ Runner-internal implementation detail; no backwards-compatibility guarantees
+
Args:
visitor: PipelineVisitor object whose callbacks will be called for each
node visited. See PipelineVisitor comments.
@@ -333,6 +338,7 @@ class Pipeline(object):
return Visitor.ok
def to_runner_api(self):
+ """For internal use only; no backwards-compatibility guarantees."""
from apache_beam.runners import pipeline_context
from apache_beam.runners.api import beam_runner_api_pb2
context = pipeline_context.PipelineContext()
@@ -346,6 +352,7 @@ class Pipeline(object):
@staticmethod
def from_runner_api(proto, runner, options):
+ """For internal use only; no backwards-compatibility guarantees."""
p = Pipeline(runner=runner, options=options)
from apache_beam.runners import pipeline_context
context = pipeline_context.PipelineContext(proto.components)
[17/19] beam git commit: Move assert_that, equal_to,
is_empty to apache_beam.testing.util
Posted by al...@apache.org.
Move assert_that, equal_to, is_empty to apache_beam.testing.util
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2070f118
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2070f118
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2070f118
Branch: refs/heads/release-2.0.0
Commit: 2070f1182d49e3b7b3e9ed8a35173cb165fa5bfb
Parents: d0da682
Author: Charles Chen <cc...@google.com>
Authored: Thu May 11 15:07:30 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:37 2017 -0700
----------------------------------------------------------------------
.../examples/complete/autocomplete_test.py | 4 +-
.../examples/complete/estimate_pi_test.py | 4 +-
.../complete/game/hourly_team_score_test.py | 4 +-
.../examples/complete/game/user_score_test.py | 4 +-
.../apache_beam/examples/complete/tfidf_test.py | 4 +-
.../complete/top_wikipedia_sessions_test.py | 4 +-
.../cookbook/bigquery_side_input_test.py | 4 +-
.../cookbook/bigquery_tornadoes_test.py | 6 +-
.../examples/cookbook/coders_test.py | 4 +-
.../examples/cookbook/combiners_test.py | 6 +-
.../examples/cookbook/custom_ptransform_test.py | 4 +-
.../examples/cookbook/filters_test.py | 12 ++-
.../examples/cookbook/mergecontacts.py | 14 +--
.../apache_beam/examples/snippets/snippets.py | 17 +--
.../examples/snippets/snippets_test.py | 30 +++---
.../apache_beam/examples/wordcount_debugging.py | 6 +-
sdks/python/apache_beam/io/avroio_test.py | 4 +-
.../python/apache_beam/io/concat_source_test.py | 4 +-
.../apache_beam/io/filebasedsource_test.py | 4 +-
sdks/python/apache_beam/io/sources_test.py | 4 +-
sdks/python/apache_beam/io/textio_test.py | 5 +-
sdks/python/apache_beam/io/tfrecordio_test.py | 24 +++--
sdks/python/apache_beam/pipeline_test.py | 4 +-
.../portability/maptask_executor_runner_test.py | 6 +-
sdks/python/apache_beam/runners/runner_test.py | 4 +-
sdks/python/apache_beam/testing/util.py | 107 +++++++++++++++++++
sdks/python/apache_beam/testing/util_test.py | 50 +++++++++
.../apache_beam/transforms/combiners_test.py | 2 +-
.../apache_beam/transforms/create_test.py | 3 +-
.../apache_beam/transforms/ptransform_test.py | 2 +-
.../apache_beam/transforms/sideinputs_test.py | 2 +-
.../apache_beam/transforms/trigger_test.py | 2 +-
sdks/python/apache_beam/transforms/util.py | 79 --------------
sdks/python/apache_beam/transforms/util_test.py | 50 ---------
.../apache_beam/transforms/window_test.py | 2 +-
.../transforms/write_ptransform_test.py | 2 +-
.../typehints/typed_pipeline_test.py | 2 +-
37 files changed, 271 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 438633a..378d222 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -22,8 +22,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import autocomplete
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class AutocompleteTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index 12d8379..fd51309 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -22,8 +22,8 @@ import unittest
from apache_beam.examples.complete import estimate_pi
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import BeamAssertException
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import BeamAssertException
def in_between(lower, upper):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
index bd0abca..9c30127 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
@@ -23,6 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete.game import hourly_team_score
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class HourlyTeamScoreTest(unittest.TestCase):
@@ -44,7 +46,7 @@ class HourlyTeamScoreTest(unittest.TestCase):
start_min='2015-11-16-15-20',
stop_min='2015-11-16-17-20',
window_duration=60))
- beam.assert_that(result, beam.equal_to([
+ assert_that(result, equal_to([
('team1', 18), ('team2', 2), ('team3', 13)]))
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/game/user_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
index 2db53bd..59903d9 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
@@ -23,6 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete.game import user_score
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class UserScoreTest(unittest.TestCase):
@@ -40,7 +42,7 @@ class UserScoreTest(unittest.TestCase):
with TestPipeline() as p:
result = (
p | beam.Create(UserScoreTest.SAMPLE_DATA) | user_score.UserScore())
- beam.assert_that(result, beam.equal_to([
+ assert_that(result, equal_to([
('user1_team1', 50), ('user2_team2', 2), ('user3_team3', 8),
('user4_team3', 5)]))
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 0e30254..f177dfc 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -26,6 +26,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import tfidf
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
EXPECTED_RESULTS = set([
@@ -57,7 +59,7 @@ class TfIdfTest(unittest.TestCase):
uri_to_line
| tfidf.TfIdf()
| beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
- beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS))
+ assert_that(result, equal_to(EXPECTED_RESULTS))
# Run the pipeline. Note that the assert_that above adds to the pipeline
# a check that the result PCollection contains expected values. To actually
# trigger the check the pipeline must be run.
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 4850c04..5fb6276 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -24,6 +24,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.complete import top_wikipedia_sessions
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class ComputeTopSessionsTest(unittest.TestCase):
@@ -54,7 +56,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
edits = p | beam.Create(self.EDITS)
result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
- beam.assert_that(result, beam.equal_to(self.EXPECTED))
+ assert_that(result, equal_to(self.EXPECTED))
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 1ca25c9..b11dc47 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -23,6 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import bigquery_side_input
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class BigQuerySideInputTest(unittest.TestCase):
@@ -42,7 +44,7 @@ class BigQuerySideInputTest(unittest.TestCase):
words_pcoll, ignore_corpus_pcoll,
ignore_word_pcoll)
- beam.assert_that(groups, beam.equal_to(
+ assert_that(groups, equal_to(
[('A', 'corpus2', 'word2'),
('B', 'corpus2', 'word2'),
('C', 'corpus2', 'word2')]))
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index ca7ca9e..c926df8 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -23,6 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import bigquery_tornadoes
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class BigQueryTornadoesTest(unittest.TestCase):
@@ -35,8 +37,8 @@ class BigQueryTornadoesTest(unittest.TestCase):
{'month': 1, 'day': 3, 'tornado': True},
{'month': 2, 'day': 1, 'tornado': True}]))
results = bigquery_tornadoes.count_tornadoes(rows)
- beam.assert_that(results, beam.equal_to([{'month': 1, 'tornado_count': 2},
- {'month': 2, 'tornado_count': 1}]))
+ assert_that(results, equal_to([{'month': 1, 'tornado_count': 2},
+ {'month': 2, 'tornado_count': 1}]))
p.run().wait_until_finish()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index 35cf252..f71dad8 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -23,8 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import coders
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class CodersTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
index 45c779f..ee1fb77 100644
--- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py
@@ -28,6 +28,8 @@ import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class CombinersTest(unittest.TestCase):
@@ -49,7 +51,7 @@ class CombinersTest(unittest.TestCase):
| beam.Create(CombinersTest.SAMPLE_DATA)
| beam.CombinePerKey(sum))
- beam.assert_that(result, beam.equal_to([('a', 6), ('b', 30), ('c', 100)]))
+ assert_that(result, equal_to([('a', 6), ('b', 30), ('c', 100)]))
result.pipeline.run()
def test_combine_per_key_with_custom_callable(self):
@@ -65,7 +67,7 @@ class CombinersTest(unittest.TestCase):
| beam.Create(CombinersTest.SAMPLE_DATA)
| beam.CombinePerKey(multiply))
- beam.assert_that(result, beam.equal_to([('a', 6), ('b', 200), ('c', 100)]))
+ assert_that(result, equal_to([('a', 6), ('b', 200), ('c', 100)]))
result.pipeline.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 2d35d8d..c7c6dba 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -23,8 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import custom_ptransform
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class CustomCountTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index 44a352f..fd49f93 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -23,6 +23,8 @@ import unittest
import apache_beam as beam
from apache_beam.examples.cookbook import filters
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class FiltersTest(unittest.TestCase):
@@ -45,22 +47,22 @@ class FiltersTest(unittest.TestCase):
def test_basic(self):
"""Test that the correct result is returned for a simple dataset."""
results = self._get_result_for_month(1)
- beam.assert_that(
+ assert_that(
results,
- beam.equal_to([{'year': 2010, 'month': 1, 'day': 1, 'mean_temp': 3},
- {'year': 2012, 'month': 1, 'day': 2, 'mean_temp': 3}]))
+ equal_to([{'year': 2010, 'month': 1, 'day': 1, 'mean_temp': 3},
+ {'year': 2012, 'month': 1, 'day': 2, 'mean_temp': 3}]))
results.pipeline.run()
def test_basic_empty(self):
"""Test that the correct empty result is returned for a simple dataset."""
results = self._get_result_for_month(3)
- beam.assert_that(results, beam.equal_to([]))
+ assert_that(results, equal_to([]))
results.pipeline.run()
def test_basic_empty_missing(self):
"""Test that the correct empty result is returned for a missing month."""
results = self._get_result_for_month(4)
- beam.assert_that(results, beam.equal_to([]))
+ assert_that(results, equal_to([]))
results.pipeline.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 5aaba10..4f53c61 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -40,6 +40,8 @@ from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
def run(argv=None, assert_results=None):
@@ -118,12 +120,12 @@ def run(argv=None, assert_results=None):
# TODO(silviuc): Move the assert_results logic to the unit test.
if assert_results is not None:
expected_luddites, expected_writers, expected_nomads = assert_results
- beam.assert_that(num_luddites, beam.equal_to([expected_luddites]),
- label='assert:luddites')
- beam.assert_that(num_writers, beam.equal_to([expected_writers]),
- label='assert:writers')
- beam.assert_that(num_nomads, beam.equal_to([expected_nomads]),
- label='assert:nomads')
+ assert_that(num_luddites, equal_to([expected_luddites]),
+ label='assert:luddites')
+ assert_that(num_writers, equal_to([expected_writers]),
+ label='assert:writers')
+ assert_that(num_nomads, equal_to([expected_nomads]),
+ label='assert:nomads')
# Execute pipeline.
return p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 1bdb9a3..7259572 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -33,6 +33,8 @@ string. The tags can contain only letters, digits and _.
import apache_beam as beam
from apache_beam.metrics import Metrics
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
# Quiet some pylint warnings that happen because of the somewhat special
# format for the code snippets.
@@ -566,8 +568,9 @@ def examples_wordcount_debugging(renames):
| 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
# [START example_wordcount_debugging_assert]
- beam.assert_that(
- filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)]))
+ beam.testing.util.assert_that(
+ filtered_words, beam.testing.util.equal_to(
+ [('Flourish', 3), ('stomach', 1)]))
# [END example_wordcount_debugging_assert]
output = (filtered_words
@@ -661,8 +664,8 @@ def model_custom_source(count):
# [END model_custom_source_use_new_source]
lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
- beam.assert_that(
- lines, beam.equal_to(
+ assert_that(
+ lines, equal_to(
['line ' + str(number) for number in range(0, count)]))
p.run().wait_until_finish()
@@ -691,8 +694,8 @@ def model_custom_source(count):
# [END model_custom_source_use_ptransform]
lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
- beam.assert_that(
- lines, beam.equal_to(
+ assert_that(
+ lines, equal_to(
['line ' + str(number) for number in range(0, count)]))
# Don't test runner api due to pickling errors.
@@ -872,7 +875,7 @@ def model_textio_compressed(renames, expected):
compression_type=beam.io.filesystem.CompressionTypes.GZIP)
# [END model_textio_write_compressed]
- beam.assert_that(lines, beam.equal_to(expected))
+ assert_that(lines, equal_to(expected))
p.visit(SnippetUtils.RenameFiles(renames))
p.run().wait_until_finish()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 37cd470..f7b51a7 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -30,10 +30,10 @@ from apache_beam import coders
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.coders.coders import ToStringCoder
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.examples.snippets import snippets
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
from apache_beam.utils.windowed_value import WindowedValue
# pylint: disable=expression-not-assigned
@@ -158,11 +158,11 @@ class ParDoTest(unittest.TestCase):
avg_word_len))
# [END model_pardo_side_input]
- beam.assert_that(small_words, beam.equal_to(['a', 'bb', 'ccc']))
- beam.assert_that(larger_than_average, beam.equal_to(['ccc', 'dddd']),
- label='larger_than_average')
- beam.assert_that(small_but_nontrivial, beam.equal_to(['bb']),
- label='small_but_not_trivial')
+ assert_that(small_words, equal_to(['a', 'bb', 'ccc']))
+ assert_that(larger_than_average, equal_to(['ccc', 'dddd']),
+ label='larger_than_average')
+ assert_that(small_but_nontrivial, equal_to(['bb']),
+ label='small_but_not_trivial')
p.run()
def test_pardo_side_input_dofn(self):
@@ -816,7 +816,7 @@ class CombineTest(unittest.TestCase):
| 'group' >> beam.GroupByKey()
| 'combine' >> beam.CombineValues(sum))
unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- beam.assert_that(unkeyed, beam.equal_to([110, 215, 120]))
+ assert_that(unkeyed, equal_to([110, 215, 120]))
p.run()
def test_setting_sliding_windows(self):
@@ -834,8 +834,8 @@ class CombineTest(unittest.TestCase):
| 'group' >> beam.GroupByKey()
| 'combine' >> beam.CombineValues(sum))
unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- beam.assert_that(unkeyed,
- beam.equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
+ assert_that(unkeyed,
+ equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
p.run()
def test_setting_session_windows(self):
@@ -853,8 +853,8 @@ class CombineTest(unittest.TestCase):
| 'group' >> beam.GroupByKey()
| 'combine' >> beam.CombineValues(sum))
unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- beam.assert_that(unkeyed,
- beam.equal_to([29, 27]))
+ assert_that(unkeyed,
+ equal_to([29, 27]))
p.run()
def test_setting_global_window(self):
@@ -872,7 +872,7 @@ class CombineTest(unittest.TestCase):
| 'group' >> beam.GroupByKey()
| 'combine' >> beam.CombineValues(sum))
unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- beam.assert_that(unkeyed, beam.equal_to([56]))
+ assert_that(unkeyed, equal_to([56]))
p.run()
def test_setting_timestamp(self):
@@ -903,7 +903,7 @@ class CombineTest(unittest.TestCase):
| 'group' >> beam.GroupByKey()
| 'combine' >> beam.CombineValues(sum))
unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- beam.assert_that(unkeyed, beam.equal_to([42, 187]))
+ assert_that(unkeyed, equal_to([42, 187]))
p.run()
@@ -921,7 +921,7 @@ class PTransformTest(unittest.TestCase):
p = TestPipeline()
lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths()
- beam.assert_that(lengths, beam.equal_to([1, 2, 3]))
+ assert_that(lengths, equal_to([1, 2, 3]))
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 98acde4..ca9f7b6 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -51,6 +51,8 @@ from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class FilterTextFn(beam.DoFn):
@@ -133,8 +135,8 @@ def run(argv=None):
# of the Pipeline implies that the expectations were met. Learn more at
# https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
# test your pipeline.
- beam.assert_that(
- filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)]))
+ assert_that(
+ filtered_words, equal_to([('Flourish', 3), ('stomach', 1)]))
# Format the counts into a PCollection of strings and write the output using a
# "Write" transform that has side effects.
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 4a21839..6dcf121 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -27,10 +27,10 @@ from apache_beam.io import avroio
from apache_beam.io import filebasedsource
from apache_beam.io import source_test_utils
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
# Importing following private class for testing purposes.
from apache_beam.io.avroio import _AvroSource as AvroSource
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/concat_source_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py
index a02f9ad..4a8f519 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -27,8 +27,8 @@ from apache_beam.io import range_trackers
from apache_beam.io import source_test_utils
from apache_beam.io.concat_source import ConcatSource
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class RangeSource(iobase.BoundedSource):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index e17a004..afb340d 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -41,10 +41,10 @@ from apache_beam.io.filebasedsource import FileBasedSource
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
class LineSource(FileBasedSource):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/sources_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index c0b8ad6..10d401b 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -28,8 +28,8 @@ from apache_beam import coders
from apache_beam.io import iobase
from apache_beam.io import range_trackers
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
class LineSource(iobase.BoundedSource):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index d00afef..9a4ec47 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -44,9 +44,8 @@ from apache_beam.io.filebasedsource_test import write_pattern
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.testing.test_pipeline import TestPipeline
-
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
# TODO: Refactor code so all io tests are using same library
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/tfrecordio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index b7e370d..3c70ade 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -36,6 +36,8 @@ from apache_beam.io.tfrecordio import _TFRecordUtil
from apache_beam.io.tfrecordio import ReadFromTFRecord
from apache_beam.io.tfrecordio import WriteToTFRecord
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
import crcmod
@@ -254,7 +256,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
coder=coders.BytesCoder(),
compression_type=CompressionTypes.AUTO,
validate=True)))
- beam.assert_that(result, beam.equal_to(['foo']))
+ assert_that(result, equal_to(['foo']))
def test_process_multiple(self):
path = os.path.join(self._new_tempdir(), 'result')
@@ -267,7 +269,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
coder=coders.BytesCoder(),
compression_type=CompressionTypes.AUTO,
validate=True)))
- beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+ assert_that(result, equal_to(['foo', 'bar']))
def test_process_gzip(self):
path = os.path.join(self._new_tempdir(), 'result')
@@ -280,7 +282,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
coder=coders.BytesCoder(),
compression_type=CompressionTypes.GZIP,
validate=True)))
- beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+ assert_that(result, equal_to(['foo', 'bar']))
def test_process_auto(self):
path = os.path.join(self._new_tempdir(), 'result.gz')
@@ -293,7 +295,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
coder=coders.BytesCoder(),
compression_type=CompressionTypes.AUTO,
validate=True)))
- beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+ assert_that(result, equal_to(['foo', 'bar']))
class TestReadFromTFRecordSource(TestTFRecordSource):
@@ -305,7 +307,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
result = (p
| ReadFromTFRecord(
path, compression_type=CompressionTypes.GZIP))
- beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+ assert_that(result, equal_to(['foo', 'bar']))
def test_process_gzip_auto(self):
path = os.path.join(self._new_tempdir(), 'result.gz')
@@ -314,7 +316,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
result = (p
| ReadFromTFRecord(
path, compression_type=CompressionTypes.AUTO))
- beam.assert_that(result, beam.equal_to(['foo', 'bar']))
+ assert_that(result, equal_to(['foo', 'bar']))
class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
@@ -337,7 +339,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
# Read the file back and compare.
with TestPipeline() as p:
actual_data = p | ReadFromTFRecord(file_path_prefix + '-*')
- beam.assert_that(actual_data, beam.equal_to(expected_data))
+ assert_that(actual_data, equal_to(expected_data))
def test_end2end_auto_compression(self):
file_path_prefix = os.path.join(self._new_tempdir(), 'result')
@@ -351,7 +353,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
# Read the file back and compare.
with TestPipeline() as p:
actual_data = p | ReadFromTFRecord(file_path_prefix + '-*')
- beam.assert_that(actual_data, beam.equal_to(expected_data))
+ assert_that(actual_data, equal_to(expected_data))
def test_end2end_auto_compression_unsharded(self):
file_path_prefix = os.path.join(self._new_tempdir(), 'result')
@@ -365,7 +367,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
# Read the file back and compare.
with TestPipeline() as p:
actual_data = p | ReadFromTFRecord(file_path_prefix + '.gz')
- beam.assert_that(actual_data, beam.equal_to(expected_data))
+ assert_that(actual_data, equal_to(expected_data))
@unittest.skipIf(tf is None, 'tensorflow not installed.')
def test_end2end_example_proto(self):
@@ -385,7 +387,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
actual_data = (p | ReadFromTFRecord(
file_path_prefix + '-*',
coder=beam.coders.ProtoCoder(example.__class__)))
- beam.assert_that(actual_data, beam.equal_to([example]))
+ assert_that(actual_data, equal_to([example]))
def test_end2end_read_write_read(self):
path = os.path.join(self._new_tempdir(), 'result')
@@ -400,7 +402,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
# Read the file back and compare.
with TestPipeline() as p:
actual_data = p | ReadFromTFRecord(path+'-*', validate=True)
- beam.assert_that(actual_data, beam.equal_to(expected_data))
+ assert_that(actual_data, equal_to(expected_data))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 8aa8a8a..e0775d1 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -33,6 +33,8 @@ from apache_beam.pipeline import PipelineVisitor
from apache_beam.pvalue import AsSingleton
from apache_beam.runners.dataflow.native_io.iobase import NativeSource
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
from apache_beam.transforms import CombineGlobally
from apache_beam.transforms import Create
from apache_beam.transforms import FlatMap
@@ -41,8 +43,6 @@ from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
from apache_beam.transforms import WindowInto
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils.timestamp import MIN_TIMESTAMP
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
index 062e6f9..b7ba15a 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
@@ -28,9 +28,9 @@ from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metricbase import MetricName
from apache_beam.pvalue import AsList
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import BeamAssertException
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import BeamAssertException
+from apache_beam.testing.util import equal_to
from apache_beam.transforms.window import TimestampedValue
from apache_beam.runners.portability import maptask_executor_runner
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index c61c49f..fa80b1c 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -36,8 +36,8 @@ from apache_beam.metrics.metricbase import MetricName
from apache_beam.pipeline import Pipeline
from apache_beam.runners import DirectRunner
from apache_beam.runners import create_runner
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
from apache_beam.options.pipeline_options import PipelineOptions
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/testing/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
new file mode 100644
index 0000000..60a6b21
--- /dev/null
+++ b/sdks/python/apache_beam/testing/util.py
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for testing Beam pipelines."""
+
+from __future__ import absolute_import
+
+from apache_beam import pvalue
+from apache_beam.transforms import window
+from apache_beam.transforms.core import Create
+from apache_beam.transforms.core import Map
+from apache_beam.transforms.core import WindowInto
+from apache_beam.transforms.util import CoGroupByKey
+from apache_beam.transforms.ptransform import PTransform
+
+
+__all__ = [
+ 'assert_that',
+ 'equal_to',
+ 'is_empty',
+ ]
+
+
+class BeamAssertException(Exception):
+ """Exception raised by matcher classes used by assert_that transform."""
+
+ pass
+
+
+# Note that equal_to always sorts the expected and actual since what we
+# compare are PCollections for which there is no guaranteed order.
+# However the sorting does not go beyond top level therefore [1,2] and [2,1]
+# are considered equal and [[1,2]] and [[2,1]] are not.
+# TODO(silviuc): Add contains_in_any_order-style matchers.
+def equal_to(expected):
+ expected = list(expected)
+
+ def _equal(actual):
+ sorted_expected = sorted(expected)
+ sorted_actual = sorted(actual)
+ if sorted_expected != sorted_actual:
+ raise BeamAssertException(
+ 'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
+ return _equal
+
+
+def is_empty():
+ def _empty(actual):
+ actual = list(actual)
+ if actual:
+ raise BeamAssertException(
+ 'Failed assert: [] == %r' % actual)
+ return _empty
+
+
+def assert_that(actual, matcher, label='assert_that'):
+ """A PTransform that checks a PCollection has an expected value.
+
+ Note that assert_that should be used only for testing pipelines since the
+ check relies on materializing the entire PCollection being checked.
+
+ Args:
+ actual: A PCollection.
+ matcher: A matcher function taking as argument the actual value of a
+ materialized PCollection. The matcher validates this actual value against
+ expectations and raises BeamAssertException if they are not met.
+ label: Optional string label. This is needed in case several assert_that
+ transforms are introduced in the same pipeline.
+
+ Returns:
+ Ignored.
+ """
+ assert isinstance(actual, pvalue.PCollection)
+
+ class AssertThat(PTransform):
+
+ def expand(self, pcoll):
+ # We must have at least a single element to ensure the matcher
+ # code gets run even if the input pcollection is empty.
+ keyed_singleton = pcoll.pipeline | Create([(None, None)])
+ keyed_actual = (
+ pcoll
+ | WindowInto(window.GlobalWindows())
+ | "ToVoidKey" >> Map(lambda v: (None, v)))
+ _ = ((keyed_singleton, keyed_actual)
+ | "Group" >> CoGroupByKey()
+ | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values)
+ | "Match" >> Map(matcher))
+
+ def default_label(self):
+ return label
+
+ actual | AssertThat() # pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/testing/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/util_test.py b/sdks/python/apache_beam/testing/util_test.py
new file mode 100644
index 0000000..1acebb6
--- /dev/null
+++ b/sdks/python/apache_beam/testing/util_test.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for testing utilities."""
+
+import unittest
+
+from apache_beam import Create
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to, is_empty
+
+
+class UtilTest(unittest.TestCase):
+
+ def test_assert_that_passes(self):
+ with TestPipeline() as p:
+ assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
+
+ def test_assert_that_fails(self):
+ with self.assertRaises(Exception):
+ with TestPipeline() as p:
+ assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3]))
+
+ def test_assert_that_fails_on_empty_input(self):
+ with self.assertRaises(Exception):
+ with TestPipeline() as p:
+ assert_that(p | Create([]), equal_to([1, 2, 3]))
+
+ def test_assert_that_fails_on_empty_expected(self):
+ with self.assertRaises(Exception):
+ with TestPipeline() as p:
+ assert_that(p | Create([1, 2, 3]), is_empty())
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 1822c19..946a60a 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -24,13 +24,13 @@ import hamcrest as hc
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
import apache_beam.transforms.combiners as combine
+from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms.core import CombineGlobally
from apache_beam.transforms.core import Create
from apache_beam.transforms.core import Map
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.util import assert_that, equal_to
class CombineTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/create_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py
index 9ede4c7..55ad7f3 100644
--- a/sdks/python/apache_beam/transforms/create_test.py
+++ b/sdks/python/apache_beam/transforms/create_test.py
@@ -20,9 +20,10 @@ import unittest
from apache_beam.io import source_test_utils
-from apache_beam import Create, assert_that, equal_to
+from apache_beam import Create
from apache_beam.coders import FastPrimitivesCoder
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
class CreateTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 3320d79..f790660 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -33,12 +33,12 @@ from apache_beam.io.iobase import Read
from apache_beam.options.pipeline_options import TypeOptions
import apache_beam.pvalue as pvalue
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import window
from apache_beam.transforms.core import GroupByKeyOnly
import apache_beam.transforms.combiners as combine
from apache_beam.transforms.display import DisplayData, DisplayDataItem
from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.util import assert_that, equal_to
import apache_beam.typehints as typehints
from apache_beam.typehints import with_input_types
from apache_beam.typehints import with_output_types
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 0bc9107..6500681 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -24,8 +24,8 @@ from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import window
-from apache_beam.transforms.util import assert_that, equal_to
class SideInputsTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 2574c4b..a27f47f 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -27,6 +27,7 @@ import yaml
import apache_beam as beam
from apache_beam.runners import pipeline_context
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import trigger
from apache_beam.transforms.core import Windowing
from apache_beam.transforms.trigger import AccumulationMode
@@ -40,7 +41,6 @@ from apache_beam.transforms.trigger import GeneralTriggerDriver
from apache_beam.transforms.trigger import InMemoryUnmergedState
from apache_beam.transforms.trigger import Repeatedly
from apache_beam.transforms.trigger import TriggerFn
-from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import MIN_TIMESTAMP
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index a6ecf0a..a7484ac 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -20,14 +20,10 @@
from __future__ import absolute_import
-from apache_beam import pvalue
-from apache_beam.transforms import window
from apache_beam.transforms.core import CombinePerKey
-from apache_beam.transforms.core import Create
from apache_beam.transforms.core import Flatten
from apache_beam.transforms.core import GroupByKey
from apache_beam.transforms.core import Map
-from apache_beam.transforms.core import WindowInto
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.ptransform import ptransform_fn
@@ -38,9 +34,6 @@ __all__ = [
'KvSwap',
'RemoveDuplicates',
'Values',
- 'assert_that',
- 'equal_to',
- 'is_empty',
]
@@ -169,75 +162,3 @@ def RemoveDuplicates(pcoll): # pylint: disable=invalid-name
| 'ToPairs' >> Map(lambda v: (v, None))
| 'Group' >> CombinePerKey(lambda vs: None)
| 'RemoveDuplicates' >> Keys())
-
-
-class BeamAssertException(Exception):
- """Exception raised by matcher classes used by assert_that transform."""
-
- pass
-
-
-# Note that equal_to always sorts the expected and actual since what we
-# compare are PCollections for which there is no guaranteed order.
-# However the sorting does not go beyond top level therefore [1,2] and [2,1]
-# are considered equal and [[1,2]] and [[2,1]] are not.
-# TODO(silviuc): Add contains_in_any_order-style matchers.
-def equal_to(expected):
- expected = list(expected)
-
- def _equal(actual):
- sorted_expected = sorted(expected)
- sorted_actual = sorted(actual)
- if sorted_expected != sorted_actual:
- raise BeamAssertException(
- 'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
- return _equal
-
-
-def is_empty():
- def _empty(actual):
- actual = list(actual)
- if actual:
- raise BeamAssertException(
- 'Failed assert: [] == %r' % actual)
- return _empty
-
-
-def assert_that(actual, matcher, label='assert_that'):
- """A PTransform that checks a PCollection has an expected value.
-
- Note that assert_that should be used only for testing pipelines since the
- check relies on materializing the entire PCollection being checked.
-
- Args:
- actual: A PCollection.
- matcher: A matcher function taking as argument the actual value of a
- materialized PCollection. The matcher validates this actual value against
- expectations and raises BeamAssertException if they are not met.
- label: Optional string label. This is needed in case several assert_that
- transforms are introduced in the same pipeline.
-
- Returns:
- Ignored.
- """
- assert isinstance(actual, pvalue.PCollection)
-
- class AssertThat(PTransform):
-
- def expand(self, pcoll):
- # We must have at least a single element to ensure the matcher
- # code gets run even if the input pcollection is empty.
- keyed_singleton = pcoll.pipeline | Create([(None, None)])
- keyed_actual = (
- pcoll
- | WindowInto(window.GlobalWindows())
- | "ToVoidKey" >> Map(lambda v: (None, v)))
- _ = ((keyed_singleton, keyed_actual)
- | "Group" >> CoGroupByKey()
- | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values)
- | "Match" >> Map(matcher))
-
- def default_label(self):
- return label
-
- actual | AssertThat() # pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
deleted file mode 100644
index 7fdef70..0000000
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the util transforms."""
-
-import unittest
-
-from apache_beam import Create
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that, equal_to, is_empty
-
-
-class UtilTest(unittest.TestCase):
-
- def test_assert_that_passes(self):
- with TestPipeline() as p:
- assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
-
- def test_assert_that_fails(self):
- with self.assertRaises(Exception):
- with TestPipeline() as p:
- assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3]))
-
- def test_assert_that_fails_on_empty_input(self):
- with self.assertRaises(Exception):
- with TestPipeline() as p:
- assert_that(p | Create([]), equal_to([1, 2, 3]))
-
- def test_assert_that_fails_on_empty_expected(self):
- with self.assertRaises(Exception):
- with TestPipeline() as p:
- assert_that(p | Create([1, 2, 3]), is_empty())
-
-
-if __name__ == '__main__':
- unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index a7797dd..fd1bb9d 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -21,6 +21,7 @@ import unittest
from apache_beam.runners import pipeline_context
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import CombinePerKey
from apache_beam.transforms import combiners
from apache_beam.transforms import core
@@ -31,7 +32,6 @@ from apache_beam.transforms import WindowInto
from apache_beam.transforms.core import Windowing
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.transforms.trigger import AfterCount
-from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import GlobalWindows
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index 27e7caa..e31b9cc 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -23,8 +23,8 @@ import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, is_empty
from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.util import assert_that, is_empty
class _TestSink(iobase.Sink):
http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 3494cfe..589dc0e 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -25,7 +25,7 @@ from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.options.pipeline_options import OptionsContext
from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.util import assert_that, equal_to
+from apache_beam.testing.util import assert_that, equal_to
from apache_beam.typehints import WithTypeHints
# These test often construct a pipeline as value | PTransform to test side
[18/19] beam git commit: Add __all__ tags to modules in package
apache_beam/testing
Posted by al...@apache.org.
Add __all__ tags to modules in package apache_beam/testing
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0f910b43
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0f910b43
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0f910b43
Branch: refs/heads/release-2.0.0
Commit: 0f910b43cbccca73d3b492560b9fc0d3f0901e21
Parents: a6f5888
Author: Charles Chen <cc...@google.com>
Authored: Wed May 10 23:20:20 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:54:12 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/testing/pipeline_verifiers.py | 8 ++++++++
sdks/python/apache_beam/testing/test_pipeline.py | 5 +++++
sdks/python/apache_beam/testing/test_stream.py | 14 +++++++++++++-
sdks/python/apache_beam/testing/test_utils.py | 6 +++++-
4 files changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0f910b43/sdks/python/apache_beam/testing/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/pipeline_verifiers.py b/sdks/python/apache_beam/testing/pipeline_verifiers.py
index 5a6082a..a08eb54 100644
--- a/sdks/python/apache_beam/testing/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/testing/pipeline_verifiers.py
@@ -32,6 +32,14 @@ from apache_beam.runners.runner import PipelineState
from apache_beam.testing import test_utils as utils
from apache_beam.utils import retry
+
+__all__ = [
+ 'PipelineStateMatcher',
+ 'FileChecksumMatcher',
+ 'retry_on_io_error_and_server_error',
+ ]
+
+
try:
from apitools.base.py.exceptions import HttpError
except ImportError:
http://git-wip-us.apache.org/repos/asf/beam/blob/0f910b43/sdks/python/apache_beam/testing/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py
index 20f4839..13b1639 100644
--- a/sdks/python/apache_beam/testing/test_pipeline.py
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -27,6 +27,11 @@ from apache_beam.options.pipeline_options import PipelineOptions
from nose.plugins.skip import SkipTest
+__all__ = [
+ 'TestPipeline',
+ ]
+
+
class TestPipeline(Pipeline):
"""TestPipeline class is used inside of Beam tests that can be configured to
run against pipeline runner.
http://git-wip-us.apache.org/repos/asf/beam/blob/0f910b43/sdks/python/apache_beam/testing/test_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py
index 7ae27b7..a06bcd0 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -15,7 +15,10 @@
# limitations under the License.
#
-"""Provides TestStream for verifying streaming runner semantics."""
+"""Provides TestStream for verifying streaming runner semantics.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
from abc import ABCMeta
from abc import abstractmethod
@@ -28,6 +31,15 @@ from apache_beam.utils import timestamp
from apache_beam.utils.windowed_value import WindowedValue
+__all__ = [
+ 'Event',
+ 'ElementEvent',
+ 'WatermarkEvent',
+ 'ProcessingTimeEvent',
+ 'TestStream',
+ ]
+
+
class Event(object):
"""Test stream event to be emitted during execution of a TestStream."""
http://git-wip-us.apache.org/repos/asf/beam/blob/0f910b43/sdks/python/apache_beam/testing/test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py
index 666207e..9feb80e 100644
--- a/sdks/python/apache_beam/testing/test_utils.py
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -15,7 +15,10 @@
# limitations under the License.
#
-"""Utility methods for testing"""
+"""Utility methods for testing
+
+For internal use only; no backwards-compatibility guarantees.
+"""
import hashlib
import imp
@@ -23,6 +26,7 @@ from mock import Mock, patch
from apache_beam.utils import retry
+
DEFAULT_HASHING_ALG = 'sha1'
[05/19] beam git commit: Add internal usage only comments to util/
Posted by al...@apache.org.
Add internal usage only comments to util/
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9730f56e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9730f56e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9730f56e
Branch: refs/heads/release-2.0.0
Commit: 9730f56eaf62c8164a517d5fe0cc03650c39a732
Parents: 65aa0ff
Author: Ahmet Altay <al...@google.com>
Authored: Thu May 11 11:34:59 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/utils/__init__.py | 5 ++++-
sdks/python/apache_beam/utils/annotations.py | 4 +++-
sdks/python/apache_beam/utils/counters.py | 5 ++++-
sdks/python/apache_beam/utils/processes.py | 6 +++++-
sdks/python/apache_beam/utils/profiler.py | 5 ++++-
sdks/python/apache_beam/utils/proto_utils.py | 2 ++
sdks/python/apache_beam/utils/retry.py | 2 ++
sdks/python/apache_beam/utils/timestamp.py | 5 ++++-
sdks/python/apache_beam/utils/urns.py | 2 ++
9 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/__init__.py b/sdks/python/apache_beam/utils/__init__.py
index 74cf45d..635c80f 100644
--- a/sdks/python/apache_beam/utils/__init__.py
+++ b/sdks/python/apache_beam/utils/__init__.py
@@ -15,4 +15,7 @@
# limitations under the License.
#
-"""A package containing utilities."""
+"""A package containing internal utilities.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/annotations.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/annotations.py b/sdks/python/apache_beam/utils/annotations.py
index 92318b1..017dd6b 100644
--- a/sdks/python/apache_beam/utils/annotations.py
+++ b/sdks/python/apache_beam/utils/annotations.py
@@ -15,7 +15,9 @@
# limitations under the License.
#
-""" Deprecated and experimental annotations.
+"""Deprecated and experimental annotations.
+
+For internal use only; no backwards-compatibility guarantees.
Annotations come in two flavors: deprecated and experimental
http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/counters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py
index e41d732..b379461 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -18,7 +18,10 @@
# cython: profile=False
# cython: overflowcheck=True
-"""Counters collect the progress of the Worker for reporting to the service."""
+"""Counters collect the progress of the Worker for reporting to the service.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
import threading
from apache_beam.transforms import cy_combiners
http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/processes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py
index e089090..e5fd9c8 100644
--- a/sdks/python/apache_beam/utils/processes.py
+++ b/sdks/python/apache_beam/utils/processes.py
@@ -14,7 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-"""Cross-platform utilities for creating subprocesses."""
+
+"""Cross-platform utilities for creating subprocesses.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
import platform
import subprocess
http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/profiler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/profiler.py b/sdks/python/apache_beam/utils/profiler.py
index 852b659..a2c3f6a 100644
--- a/sdks/python/apache_beam/utils/profiler.py
+++ b/sdks/python/apache_beam/utils/profiler.py
@@ -15,7 +15,10 @@
# limitations under the License.
#
-"""A profiler context manager based on cProfile.Profile objects."""
+"""A profiler context manager based on cProfile.Profile objects.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
import cProfile
import logging
http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py
index d929a92..090a821 100644
--- a/sdks/python/apache_beam/utils/proto_utils.py
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+"""For internal use only; no backwards-compatibility guarantees."""
+
from google.protobuf import any_pb2
from google.protobuf import struct_pb2
http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/retry.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
index 2c32f0f..1a8b907 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -17,6 +17,8 @@
"""Retry decorators for calls raising exceptions.
+For internal use only; no backwards-compatibility guarantees.
+
This module is used mostly to decorate all integration points where the code
makes calls to remote services. Searching through the code base for @retry
should find all such places. For this reason even places where retry is not
http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/timestamp.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py
index 8b2ccda..5d1b48c 100644
--- a/sdks/python/apache_beam/utils/timestamp.py
+++ b/sdks/python/apache_beam/utils/timestamp.py
@@ -15,7 +15,10 @@
# limitations under the License.
#
-"""Timestamp utilities."""
+"""Timestamp utilities.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
from __future__ import absolute_import
http://git-wip-us.apache.org/repos/asf/beam/blob/9730f56e/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 46bd8f5..379b5ff 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+"""For internal use only; no backwards-compatibility guarantees."""
+
import abc
import inspect
[13/19] beam git commit: Remove some internal details from the public
API.
Posted by al...@apache.org.
Remove some internal details from the public API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e12cf0df
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e12cf0df
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e12cf0df
Branch: refs/heads/release-2.0.0
Commit: e12cf0dfd810ec26965ae98e5a2256f560c6ff6d
Parents: 2070f11
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed May 10 15:55:46 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:37 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 4 ++--
.../runners/dataflow/dataflow_runner.py | 6 +++---
.../runners/dataflow/dataflow_runner_test.py | 6 +++---
.../apache_beam/runners/direct/executor.py | 2 +-
.../runners/direct/transform_evaluator.py | 10 ++++-----
sdks/python/apache_beam/transforms/__init__.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 22 ++++++++++----------
.../python/apache_beam/transforms/ptransform.py | 6 +++---
.../apache_beam/transforms/ptransform_test.py | 12 +++++------
sdks/python/apache_beam/typehints/typecheck.py | 2 +-
10 files changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 79480d7..5048534 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -77,8 +77,8 @@ class Pipeline(object):
the PValues are the edges.
All the transforms applied to the pipeline must have distinct full labels.
- If same transform instance needs to be applied then a clone should be created
- with a new label (e.g., transform.clone('new label')).
+ If same transform instance needs to be applied then the right shift operator
+ should be used to designate new names (e.g. `input | "label" >> my_tranform`).
"""
def __init__(self, runner=None, options=None, argv=None):
http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index da8de9d..0ecd22a 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -160,7 +160,7 @@ class DataflowRunner(PipelineRunner):
class GroupByKeyInputVisitor(PipelineVisitor):
"""A visitor that replaces `Any` element type for input `PCollection` of
- a `GroupByKey` or `GroupByKeyOnly` with a `KV` type.
+ a `GroupByKey` or `_GroupByKeyOnly` with a `KV` type.
TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,
we could directly replace the coder instead of mutating the element type.
@@ -169,8 +169,8 @@ class DataflowRunner(PipelineRunner):
def visit_transform(self, transform_node):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.transforms.core import GroupByKey, GroupByKeyOnly
- if isinstance(transform_node.transform, (GroupByKey, GroupByKeyOnly)):
+ from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly
+ if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)):
pcoll = transform_node.inputs[0]
input_type = pcoll.element_type
# If input_type is not specified, then treat it as `Any`.
http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index ac9b028..ff4b51d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -37,7 +37,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeExceptio
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.transforms.core import GroupByKeyOnly
+from apache_beam.transforms.core import _GroupByKeyOnly
from apache_beam.typehints import typehints
# Protect against environments where apitools library is not available.
@@ -185,7 +185,7 @@ class DataflowRunnerTest(unittest.TestCase):
pcoll1 = PCollection(p)
pcoll2 = PCollection(p)
pcoll3 = PCollection(p)
- for transform in [GroupByKeyOnly(), beam.GroupByKey()]:
+ for transform in [_GroupByKeyOnly(), beam.GroupByKey()]:
pcoll1.element_type = None
pcoll2.element_type = typehints.Any
pcoll3.element_type = typehints.KV[typehints.Any, typehints.Any]
@@ -199,7 +199,7 @@ class DataflowRunnerTest(unittest.TestCase):
p = TestPipeline()
pcoll1 = PCollection(p)
pcoll2 = PCollection(p)
- for transform in [GroupByKeyOnly(), beam.GroupByKey()]:
+ for transform in [_GroupByKeyOnly(), beam.GroupByKey()]:
pcoll1.element_type = typehints.TupleSequenceConstraint
pcoll2.element_type = typehints.Set
err_msg = "Input to GroupByKey must be of Tuple or Any type"
http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 9efbede..86db291 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -155,7 +155,7 @@ class _SerialEvaluationState(_TransformEvaluationState):
item of work will be submitted to the ExecutorService at any time.
A principal use of this is for evaluators that keeps a global state such as
- GroupByKeyOnly.
+ _GroupByKeyOnly.
"""
def __init__(self, executor_service, scheduled):
http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 6984ded..b1cb626 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -53,7 +53,7 @@ class TransformEvaluatorRegistry(object):
io.Read: _BoundedReadEvaluator,
core.Flatten: _FlattenEvaluator,
core.ParDo: _ParDoEvaluator,
- core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
+ core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
_NativeWrite: _NativeWriteEvaluator,
}
@@ -83,7 +83,7 @@ class TransformEvaluatorRegistry(object):
"""Returns True if this applied_ptransform should run one bundle at a time.
Some TransformEvaluators use a global state object to keep track of their
- global execution state. For example evaluator for GroupByKeyOnly uses this
+ global execution state. For example evaluator for _GroupByKeyOnly uses this
state as an in memory dictionary to buffer keys.
Serially executed evaluators will act as syncing point in the graph and
@@ -99,7 +99,7 @@ class TransformEvaluatorRegistry(object):
True if executor should execute applied_ptransform serially.
"""
return isinstance(applied_ptransform.transform,
- (core.GroupByKeyOnly, _NativeWrite))
+ (core._GroupByKeyOnly, _NativeWrite))
class _TransformEvaluator(object):
@@ -325,7 +325,7 @@ class _ParDoEvaluator(_TransformEvaluator):
class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
- """TransformEvaluator for GroupByKeyOnly transform."""
+ """TransformEvaluator for _GroupByKeyOnly transform."""
MAX_ELEMENT_PER_BUNDLE = None
@@ -369,7 +369,7 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
k, v = element.value
self.state.output[self.key_coder.encode(k)].append(v)
else:
- raise TypeCheckError('Input to GroupByKeyOnly must be a PCollection of '
+ raise TypeCheckError('Input to _GroupByKeyOnly must be a PCollection of '
'windowed key-value pairs. Instead received: %r.'
% element)
http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/transforms/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py
index 847fb8f..b77b0f6 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -21,5 +21,5 @@
from apache_beam.transforms import combiners
from apache_beam.transforms.core import *
from apache_beam.transforms.ptransform import *
-from apache_beam.transforms.timeutil import *
+from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.util import *
http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index abe699f..0e497f9 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -215,7 +215,7 @@ class DoFn(WithTypeHints, HasDisplayData):
return Any
return type_hint
- def process_argspec_fn(self):
+ def _process_argspec_fn(self):
"""Returns the Python callable that will eventually be invoked.
This should ideally be the user-level function that is called with
@@ -307,7 +307,7 @@ class CallableWrapperDoFn(DoFn):
return self._strip_output_annotations(
trivial_inference.infer_return_type(self._fn, [input_type]))
- def process_argspec_fn(self):
+ def _process_argspec_fn(self):
return getattr(self._fn, '_argspec_fn', self._fn)
@@ -641,8 +641,8 @@ class ParDo(PTransformWithSideInputs):
return fn
return CallableWrapperDoFn(fn)
- def process_argspec_fn(self):
- return self.fn.process_argspec_fn()
+ def _process_argspec_fn(self):
+ return self.fn._process_argspec_fn()
def display_data(self):
return {'fn': DisplayDataItem(self.fn.__class__,
@@ -870,19 +870,19 @@ class CombineGlobally(PTransform):
def default_label(self):
return 'CombineGlobally(%s)' % ptransform.label_from_callable(self.fn)
- def clone(self, **extra_attributes):
+ def _clone(self, **extra_attributes):
clone = copy.copy(self)
clone.__dict__.update(extra_attributes)
return clone
def with_defaults(self, has_defaults=True):
- return self.clone(has_defaults=has_defaults)
+ return self._clone(has_defaults=has_defaults)
def without_defaults(self):
return self.with_defaults(False)
def as_singleton_view(self):
- return self.clone(as_view=True)
+ return self._clone(as_view=True)
def expand(self, pcoll):
def add_input_types(transform):
@@ -964,7 +964,7 @@ class CombinePerKey(PTransformWithSideInputs):
def default_label(self):
return '%s(%s)' % (self.__class__.__name__, self._fn_label)
- def process_argspec_fn(self):
+ def _process_argspec_fn(self):
return self.fn._fn # pylint: disable=protected-access
def expand(self, pcoll):
@@ -1133,7 +1133,7 @@ class GroupByKey(PTransform):
return (pcoll
| 'ReifyWindows' >> (ParDo(self.ReifyWindows())
.with_output_types(reify_output_type))
- | 'GroupByKey' >> (GroupByKeyOnly()
+ | 'GroupByKey' >> (_GroupByKeyOnly()
.with_input_types(reify_output_type)
.with_output_types(gbk_input_type))
| ('GroupByWindow' >> ParDo(
@@ -1144,14 +1144,14 @@ class GroupByKey(PTransform):
# The input_type is None, run the default
return (pcoll
| 'ReifyWindows' >> ParDo(self.ReifyWindows())
- | 'GroupByKey' >> GroupByKeyOnly()
+ | 'GroupByKey' >> _GroupByKeyOnly()
| 'GroupByWindow' >> ParDo(
self.GroupAlsoByWindow(pcoll.windowing)))
@typehints.with_input_types(typehints.KV[K, V])
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
-class GroupByKeyOnly(PTransform):
+class _GroupByKeyOnly(PTransform):
"""A group by key transform, ignoring windows."""
def infer_output_type(self, input_type):
key_type, value_type = trivial_inference.key_value_types(input_type)
http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 8898c36..bd2a120 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -303,7 +303,7 @@ class PTransform(WithTypeHints, HasDisplayData):
# TODO(ccy): further refine this API.
return None
- def clone(self, new_label):
+ def _clone(self, new_label):
"""Clones the current transform instance under a new label."""
transform = copy.copy(self)
transform.label = new_label
@@ -567,7 +567,7 @@ class PTransformWithSideInputs(PTransform):
arg_types = [pvalueish.element_type] + [element_type(v) for v in args]
kwargs_types = {k: element_type(v) for (k, v) in kwargs.items()}
- argspec_fn = self.process_argspec_fn()
+ argspec_fn = self._process_argspec_fn()
bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types)
hints = getcallargs_forhints(argspec_fn, *type_hints[0], **type_hints[1])
for arg, hint in hints.items():
@@ -581,7 +581,7 @@ class PTransformWithSideInputs(PTransform):
'Type hint violation for \'%s\': requires %s but got %s for %s'
% (self.label, hint, bindings[arg], arg))
- def process_argspec_fn(self):
+ def _process_argspec_fn(self):
"""Returns an argspec of the function actually consuming the data.
"""
raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index f790660..efc5978 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -35,7 +35,7 @@ import apache_beam.pvalue as pvalue
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import window
-from apache_beam.transforms.core import GroupByKeyOnly
+from apache_beam.transforms.core import _GroupByKeyOnly
import apache_beam.transforms.combiners as combine
from apache_beam.transforms.display import DisplayData, DisplayDataItem
from apache_beam.transforms.ptransform import PTransform
@@ -580,7 +580,7 @@ class PTransformTest(unittest.TestCase):
pipeline = TestPipeline()
pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
with self.assertRaises(typehints.TypeCheckError) as cm:
- pcolls | 'D' >> GroupByKeyOnly()
+ pcolls | 'D' >> _GroupByKeyOnly()
pipeline.run()
expected_error_prefix = ('Input type hint violation at D: expected '
@@ -1088,7 +1088,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
| ('Pair' >> beam.Map(lambda x: (x, ord(x)))
.with_output_types(typehints.KV[str, str]))
- | GroupByKeyOnly())
+ | _GroupByKeyOnly())
# Output type should correctly be deduced.
# GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -1112,7 +1112,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create([1, 2, 3]).with_output_types(int)
- | 'F' >> GroupByKeyOnly())
+ | 'F' >> _GroupByKeyOnly())
self.assertEqual("Input type hint violation at F: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1155,7 +1155,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
(self.p
| 'Nums' >> beam.Create(range(5)).with_output_types(int)
| 'ModDup' >> beam.Map(lambda x: (x % 2, x))
- | GroupByKeyOnly())
+ | _GroupByKeyOnly())
self.assertEqual('Pipeline type checking is enabled, however no output '
'type-hint was found for the PTransform '
@@ -1978,7 +1978,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_gbk_type_inference(self):
self.assertEqual(
typehints.Tuple[str, typehints.Iterable[int]],
- GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
+ _GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
def test_pipeline_inference(self):
created = self.p | beam.Create(['a', 'b', 'c'])
http://git-wip-us.apache.org/repos/asf/beam/blob/e12cf0df/sdks/python/apache_beam/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py
index 09b73f9..89a5f5c 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -109,7 +109,7 @@ class TypeCheckWrapperDoFn(AbstractDoFnWrapper):
def __init__(self, dofn, type_hints, label=None):
super(TypeCheckWrapperDoFn, self).__init__(dofn)
self.dofn = dofn
- self._process_fn = self.dofn.process_argspec_fn()
+ self._process_fn = self.dofn._process_argspec_fn()
if type_hints.input_types:
input_args, input_kwargs = type_hints.input_types
self._input_hints = getcallargs_forhints(
[19/19] beam git commit: This closes #3103
Posted by al...@apache.org.
This closes #3103
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1cc32c65
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1cc32c65
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1cc32c65
Branch: refs/heads/release-2.0.0
Commit: 1cc32c65ea9f49c78d22999823884e1363a921d4
Parents: 65aa0ff 0f910b4
Author: Ahmet Altay <al...@google.com>
Authored: Thu May 11 17:11:38 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 17:11:38 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.py | 2 +
sdks/python/apache_beam/coders/coders.py | 32 +++++-
sdks/python/apache_beam/coders/coders_test.py | 11 +-
.../apache_beam/coders/coders_test_common.py | 2 +-
sdks/python/apache_beam/coders/observable.py | 5 +-
sdks/python/apache_beam/coders/slow_stream.py | 5 +-
.../apache_beam/coders/standard_coders_test.py | 2 +-
sdks/python/apache_beam/coders/stream.pyx | 5 +
sdks/python/apache_beam/coders/typecoders.py | 3 +
.../examples/complete/autocomplete_test.py | 4 +-
.../examples/complete/estimate_pi_test.py | 4 +-
.../complete/game/hourly_team_score_test.py | 4 +-
.../examples/complete/game/user_score_test.py | 4 +-
.../apache_beam/examples/complete/tfidf_test.py | 4 +-
.../complete/top_wikipedia_sessions_test.py | 4 +-
.../cookbook/bigquery_side_input_test.py | 4 +-
.../cookbook/bigquery_tornadoes_test.py | 6 +-
.../examples/cookbook/coders_test.py | 4 +-
.../examples/cookbook/combiners_test.py | 6 +-
.../examples/cookbook/custom_ptransform_test.py | 4 +-
.../examples/cookbook/filters_test.py | 12 ++-
.../examples/cookbook/mergecontacts.py | 14 +--
.../apache_beam/examples/snippets/snippets.py | 17 +--
.../examples/snippets/snippets_test.py | 35 +++---
.../apache_beam/examples/wordcount_debugging.py | 6 +-
sdks/python/apache_beam/internal/__init__.py | 2 +
.../python/apache_beam/internal/gcp/__init__.py | 2 +
sdks/python/apache_beam/io/avroio_test.py | 4 +-
sdks/python/apache_beam/io/concat_source.py | 4 +-
.../python/apache_beam/io/concat_source_test.py | 4 +-
sdks/python/apache_beam/io/filebasedsource.py | 2 +
.../apache_beam/io/filebasedsource_test.py | 4 +-
sdks/python/apache_beam/io/fileio.py | 7 ++
sdks/python/apache_beam/io/fileio_test.py | 2 +-
sdks/python/apache_beam/io/filesystem.py | 3 +
sdks/python/apache_beam/io/filesystems.py | 2 +
.../io/gcp/datastore/v1/fake_datastore.py | 6 +-
.../apache_beam/io/gcp/datastore/v1/helper.py | 5 +-
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 2 +
sdks/python/apache_beam/io/gcp/gcsio.py | 3 +
sdks/python/apache_beam/io/gcp/pubsub.py | 2 +
.../io/gcp/tests/bigquery_matcher.py | 3 +
sdks/python/apache_beam/io/iobase.py | 2 +
sdks/python/apache_beam/io/localfilesystem.py | 2 +
sdks/python/apache_beam/io/range_trackers.py | 7 +-
sdks/python/apache_beam/io/source_test_utils.py | 8 ++
sdks/python/apache_beam/io/sources_test.py | 4 +-
sdks/python/apache_beam/io/textio.py | 2 +-
sdks/python/apache_beam/io/textio_test.py | 5 +-
sdks/python/apache_beam/io/tfrecordio_test.py | 24 +++--
sdks/python/apache_beam/metrics/__init__.py | 1 +
sdks/python/apache_beam/metrics/cells.py | 2 +
sdks/python/apache_beam/metrics/execution.py | 6 +-
sdks/python/apache_beam/metrics/metric.py | 4 +
sdks/python/apache_beam/metrics/metricbase.py | 2 +
.../apache_beam/options/pipeline_options.py | 20 +++-
.../options/pipeline_options_validator.py | 2 +
.../apache_beam/options/value_provider.py | 8 ++
sdks/python/apache_beam/pipeline.py | 13 ++-
sdks/python/apache_beam/pipeline_test.py | 4 +-
sdks/python/apache_beam/pvalue.py | 11 ++
sdks/python/apache_beam/runners/api/__init__.py | 4 +-
sdks/python/apache_beam/runners/common.py | 5 +-
.../apache_beam/runners/dataflow/__init__.py | 9 ++
.../runners/dataflow/dataflow_runner.py | 9 +-
.../runners/dataflow/dataflow_runner_test.py | 5 +-
.../runners/dataflow/test_dataflow_runner.py | 3 +
.../apache_beam/runners/direct/__init__.py | 6 +-
.../apache_beam/runners/direct/direct_runner.py | 3 +
.../apache_beam/runners/direct/executor.py | 2 +-
.../runners/direct/transform_evaluator.py | 10 +-
.../apache_beam/runners/pipeline_context.py | 6 ++
.../apache_beam/runners/portability/__init__.py | 2 +
.../portability/maptask_executor_runner.py | 2 +-
.../portability/maptask_executor_runner_test.py | 6 +-
sdks/python/apache_beam/runners/runner.py | 3 +
sdks/python/apache_beam/runners/runner_test.py | 4 +-
.../apache_beam/runners/worker/__init__.py | 2 +
.../apache_beam/testing/pipeline_verifiers.py | 8 ++
.../python/apache_beam/testing/test_pipeline.py | 5 +
sdks/python/apache_beam/testing/test_stream.py | 14 ++-
sdks/python/apache_beam/testing/test_utils.py | 6 +-
sdks/python/apache_beam/testing/util.py | 107 +++++++++++++++++++
sdks/python/apache_beam/testing/util_test.py | 50 +++++++++
sdks/python/apache_beam/transforms/__init__.py | 2 +-
.../apache_beam/transforms/combiners_test.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 50 ++++++---
.../apache_beam/transforms/create_test.py | 3 +-
.../apache_beam/transforms/cy_combiners.py | 5 +-
.../python/apache_beam/transforms/ptransform.py | 29 +++--
.../apache_beam/transforms/ptransform_test.py | 13 +--
.../python/apache_beam/transforms/sideinputs.py | 2 +
.../apache_beam/transforms/sideinputs_test.py | 2 +-
sdks/python/apache_beam/transforms/timeutil.py | 5 +
sdks/python/apache_beam/transforms/trigger.py | 14 +++
.../apache_beam/transforms/trigger_test.py | 2 +-
sdks/python/apache_beam/transforms/util.py | 79 --------------
sdks/python/apache_beam/transforms/util_test.py | 50 ---------
sdks/python/apache_beam/transforms/window.py | 17 ++-
.../apache_beam/transforms/window_test.py | 2 +-
.../transforms/write_ptransform_test.py | 2 +-
sdks/python/apache_beam/typehints/decorators.py | 17 ++-
sdks/python/apache_beam/typehints/opcodes.py | 2 +
.../apache_beam/typehints/trivial_inference.py | 2 +
sdks/python/apache_beam/typehints/typecheck.py | 17 +--
.../typehints/typed_pipeline_test.py | 2 +-
sdks/python/apache_beam/typehints/typehints.py | 18 ++++
.../apache_beam/typehints/typehints_test.py | 19 ++--
sdks/python/apache_beam/utils/__init__.py | 5 +-
sdks/python/apache_beam/utils/annotations.py | 4 +-
sdks/python/apache_beam/utils/counters.py | 5 +-
sdks/python/apache_beam/utils/processes.py | 6 +-
sdks/python/apache_beam/utils/profiler.py | 5 +-
sdks/python/apache_beam/utils/proto_utils.py | 2 +
sdks/python/apache_beam/utils/retry.py | 2 +
sdks/python/apache_beam/utils/timestamp.py | 5 +-
sdks/python/apache_beam/utils/urns.py | 2 +
sdks/python/apache_beam/utils/windowed_value.py | 4 +-
118 files changed, 724 insertions(+), 326 deletions(-)
----------------------------------------------------------------------
[02/19] beam git commit: fix lint error in fake_datastore.py
Posted by al...@apache.org.
fix lint error in fake_datastore.py
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dec27d8f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dec27d8f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dec27d8f
Branch: refs/heads/release-2.0.0
Commit: dec27d8f205f7a27e427ae9e451c4c875fa8cf04
Parents: 7fd012b
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Thu May 11 14:39:55 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/dec27d8f/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py b/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
index 0caf6d6..2332579 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
@@ -31,6 +31,7 @@ except ImportError:
pass
# pylint: enable=wrong-import-order, wrong-import-position
+
def create_run_query(entities, batch_size):
"""A fake datastore run_query method that returns entities in batches.
[09/19] beam git commit: [BEAM-1340] Adds __all__ tags to classes in
package apache_beam/io.
Posted by al...@apache.org.
[BEAM-1340] Adds __all__ tags to classes in package apache_beam/io.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c784f95
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c784f95
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c784f95
Branch: refs/heads/release-2.0.0
Commit: 0c784f958f11eb0e448cc448d2b9d36fd90e9972
Parents: dec27d8
Author: chamikara@google.com <ch...@google.com>
Authored: Thu May 11 11:46:46 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/concat_source.py | 4 +++-
sdks/python/apache_beam/io/filebasedsource.py | 2 ++
sdks/python/apache_beam/io/fileio.py | 7 +++++++
sdks/python/apache_beam/io/filesystem.py | 3 +++
sdks/python/apache_beam/io/filesystems.py | 2 ++
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 2 ++
sdks/python/apache_beam/io/gcp/gcsio.py | 3 +++
sdks/python/apache_beam/io/gcp/pubsub.py | 2 ++
sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py | 3 +++
sdks/python/apache_beam/io/iobase.py | 2 ++
sdks/python/apache_beam/io/localfilesystem.py | 2 ++
sdks/python/apache_beam/io/range_trackers.py | 7 ++++++-
sdks/python/apache_beam/io/source_test_utils.py | 8 ++++++++
13 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/concat_source.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py
index dfd1695..56c4cca 100644
--- a/sdks/python/apache_beam/io/concat_source.py
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -15,7 +15,9 @@
# limitations under the License.
#
-"""Concat Source, which reads the union of several other sources.
+"""For internal use only; no backwards-compatibility guarantees.
+
+Concat Source, which reads the union of several other sources.
"""
import bisect
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 215e015..bb9efc4 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -38,6 +38,8 @@ from apache_beam.options.value_provider import check_accessible
MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
+__all__ = ['FileBasedSource']
+
class FileBasedSource(iobase.BoundedSource):
"""A ``BoundedSource`` for reading a file glob of a given type."""
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index ca3a759..aa18093 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -37,6 +37,8 @@ from apache_beam.options.value_provider import check_accessible
DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
+__all__ = ['FileBasedSink']
+
class FileSink(iobase.Sink):
"""A sink to a GCS or local files.
@@ -280,6 +282,11 @@ class FileSink(iobase.Sink):
return type(self) == type(other) and self.__dict__ == other.__dict__
+# Using FileBasedSink for the public API to be symmetric with FileBasedSource.
+# TODO: move code from FileSink to here and delete that class.
+FileBasedSink = FileSink
+
+
class FileSinkWriter(iobase.Writer):
"""The writer for FileSink.
"""
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index 3d35f3e..db6a1d0 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -30,6 +30,9 @@ logger = logging.getLogger(__name__)
DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+__all__ = ['CompressionTypes', 'CompressedFile', 'FileMetadata', 'FileSystem',
+ 'MatchResult']
+
class CompressionTypes(object):
"""Enum-like class representing known compression types."""
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/filesystems.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py
index 29f0644..e039686 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -33,6 +33,8 @@ except ImportError:
pass
# pylint: enable=wrong-import-position, unused-import
+__all__ = ['FileSystems']
+
class FileSystems(object):
"""A class that defines the functions that can be performed on a filesystem.
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index dc71fce..ce8b5e6 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -26,6 +26,8 @@ from apache_beam.io.filesystem import FileSystem
from apache_beam.io.filesystem import MatchResult
from apache_beam.io.gcp import gcsio
+__all__ = ['GCSFileSystem']
+
class GCSFileSystem(FileSystem):
"""A GCS ``FileSystem`` implementation for accessing files on GCS.
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index c76c99d..774ee54 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -34,6 +34,9 @@ import traceback
from apache_beam.utils import retry
+__all__ = ['GcsIO']
+
+
# Issue a friendlier error message if the storage library is not available.
# TODO(silviuc): Remove this guard when storage is available everywhere.
try:
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/gcp/pubsub.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index efc628d..103fce0 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -26,6 +26,8 @@ from apache_beam import coders
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
from apache_beam.transforms.display import DisplayDataItem
+__all__ = ['PubSubSink', 'PubSubSource']
+
class PubSubSource(dataflow_io.NativeSource):
"""Source for reading from a given Cloud Pub/Sub topic.
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index f42b70f..844cbc5 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -24,6 +24,9 @@ from hamcrest.core.base_matcher import BaseMatcher
from apache_beam.testing.test_utils import compute_hash
from apache_beam.utils import retry
+__all__ = ['BigqueryMatcher']
+
+
# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index e8ffb72..a80b12f 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -46,6 +46,8 @@ from apache_beam.transforms.display import HasDisplayData
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.utils.windowed_value import WindowedValue
+__all__ = ['BoundedSource', 'RangeTracker', 'Read', 'Sink', 'Write', 'Writer']
+
# Encapsulates information about a bundle of a source generated when method
# BoundedSource.split() is invoked.
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index c670704..b08ac49 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -29,6 +29,8 @@ from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.filesystem import MatchResult
+__all__ = ['LocalFileSystem']
+
class LocalFileSystem(FileSystem):
"""A Local ``FileSystem`` implementation for accessing files on disk.
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index 000df81..9cb36e7 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -24,6 +24,9 @@ import threading
from apache_beam.io import iobase
+__all__ = ['OffsetRangeTracker', 'LexicographicKeyRangeTracker',
+ 'OrderedPositionRangeTracker', 'UnsplittableRangeTracker']
+
class OffsetRangeTracker(iobase.RangeTracker):
"""A 'RangeTracker' for non-negative positions of type 'long'."""
@@ -191,7 +194,9 @@ class OffsetRangeTracker(iobase.RangeTracker):
class GroupedShuffleRangeTracker(iobase.RangeTracker):
- """A 'RangeTracker' for positions used by'GroupedShuffleReader'.
+ """For internal use only; no backwards-compatibility guarantees.
+
+ A 'RangeTracker' for positions used by'GroupedShuffleReader'.
These positions roughly correspond to hashes of keys. In case of hash
collisions, multiple groups can have the same position. In that case, the
http://git-wip-us.apache.org/repos/asf/beam/blob/0c784f95/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index edb6409..a144a8a 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -52,6 +52,14 @@ import weakref
from multiprocessing.pool import ThreadPool
from apache_beam.io import iobase
+__all__ = ['read_from_source', 'assert_sources_equal_reference_source',
+ 'assert_reentrant_reads_succeed',
+ 'assert_split_at_fraction_behavior',
+ 'assert_split_at_fraction_binary',
+ 'assert_split_at_fraction_exhaustive',
+ 'assert_split_at_fraction_fails',
+ 'assert_split_at_fraction_succeeds_and_consistent']
+
class ExpectedSplitOutcome(object):
MUST_SUCCEED_AND_BE_CONSISTENT = 1
[07/19] beam git commit: Add internal comments to metrics
Posted by al...@apache.org.
Add internal comments to metrics
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6543abb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6543abb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6543abb
Branch: refs/heads/release-2.0.0
Commit: a6543abb37e65fe6c7afcaf3ea670c75aee1f7d0
Parents: 6d02da0
Author: Ahmet Altay <al...@google.com>
Authored: Thu May 11 14:50:33 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/metrics/__init__.py | 1 +
sdks/python/apache_beam/metrics/cells.py | 2 ++
sdks/python/apache_beam/metrics/execution.py | 6 ++----
sdks/python/apache_beam/metrics/metric.py | 4 ++++
sdks/python/apache_beam/metrics/metricbase.py | 2 ++
5 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a6543abb/sdks/python/apache_beam/metrics/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/__init__.py b/sdks/python/apache_beam/metrics/__init__.py
index 164d1a8..8ce7bbb 100644
--- a/sdks/python/apache_beam/metrics/__init__.py
+++ b/sdks/python/apache_beam/metrics/__init__.py
@@ -15,3 +15,4 @@
# limitations under the License.
#
from apache_beam.metrics.metric import Metrics
+from apache_beam.metrics.metric import MetricsFilter
http://git-wip-us.apache.org/repos/asf/beam/blob/a6543abb/sdks/python/apache_beam/metrics/cells.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index fbe3ad3..ba840f7 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -29,6 +29,8 @@ import threading
from apache_beam.metrics.metricbase import Counter
from apache_beam.metrics.metricbase import Distribution
+__all__ = ['DistributionResult']
+
class CellCommitState(object):
"""For internal use only; no backwards-compatibility guarantees.
http://git-wip-us.apache.org/repos/asf/beam/blob/a6543abb/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index a06ec0c..675e49c 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -24,7 +24,7 @@ Available classes:
- MetricKey - Internal key for a metric.
- MetricResult - Current status of a metric's updates/commits.
-- MetricsEnvironment - Keeps track of MetricsContainer and other metrics
+- _MetricsEnvironment - Keeps track of MetricsContainer and other metrics
information for every single execution working thread.
- MetricsContainer - Holds the metrics of a single step and a single
unit-of-commit (bundle).
@@ -36,9 +36,7 @@ from apache_beam.metrics.cells import CounterCell, DistributionCell
class MetricKey(object):
- """
-
- Key used to identify instance of metric cell.
+ """Key used to identify instance of metric cell.
Metrics are internally keyed by the step name they associated with and
the name of the metric.
http://git-wip-us.apache.org/repos/asf/beam/blob/a6543abb/sdks/python/apache_beam/metrics/metric.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
index 33db4e1..f99c0c4 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -30,6 +30,8 @@ from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metricbase import Counter, Distribution
from apache_beam.metrics.metricbase import MetricName
+__all__ = ['Metrics', 'MetricsFilter']
+
class Metrics(object):
"""Lets users create/access metric objects during pipeline execution."""
@@ -146,6 +148,8 @@ class MetricResults(object):
class MetricsFilter(object):
"""Simple object to filter metrics results.
+ This class is experimental. No backwards-compatibility guarantees.
+
If filters by matching a result's step-namespace-name with three internal
sets. No execution/matching logic is added to this object, so that it may
be used to construct arguments as an RPC request. It is left for runners
http://git-wip-us.apache.org/repos/asf/beam/blob/a6543abb/sdks/python/apache_beam/metrics/metricbase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metricbase.py b/sdks/python/apache_beam/metrics/metricbase.py
index fa0ca75..699f29c 100644
--- a/sdks/python/apache_beam/metrics/metricbase.py
+++ b/sdks/python/apache_beam/metrics/metricbase.py
@@ -30,6 +30,8 @@ Available classes:
- MetricName - Namespace and name used to refer to a Metric.
"""
+__all__ = ['Metric', 'Counter', 'Distribution', 'MetricName']
+
class MetricName(object):
"""The name of a metric.
[10/19] beam git commit: [BEAM-1345] Clearly delineate public API in
apache_beam/options
Posted by al...@apache.org.
[BEAM-1345] Clearly delineate public API in apache_beam/options
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6d77f958
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6d77f958
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6d77f958
Branch: refs/heads/release-2.0.0
Commit: 6d77f958de666ccf5f59e907c292efbc9272b49b
Parents: 0a0cc2d
Author: Charles Chen <cc...@google.com>
Authored: Thu May 11 13:31:18 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700
----------------------------------------------------------------------
.../apache_beam/options/pipeline_options.py | 20 +++++++++++++++++---
.../options/pipeline_options_validator.py | 2 ++
.../apache_beam/options/value_provider.py | 8 ++++++++
3 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6d77f958/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index b79d85d..983d128 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -25,6 +25,20 @@ from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.options.value_provider import ValueProvider
+__all__ = [
+ 'PipelineOptions',
+ 'StandardOptions',
+ 'TypeOptions',
+ 'DirectOptions',
+ 'GoogleCloudOptions',
+ 'WorkerOptions',
+ 'DebugOptions',
+ 'ProfilingOptions',
+ 'SetupOptions',
+ 'TestOptions',
+ ]
+
+
def _static_value_provider_of(value_type):
""""Helper function to plug a ValueProvider into argparse.
@@ -42,7 +56,7 @@ def _static_value_provider_of(value_type):
return _f
-class BeamArgumentParser(argparse.ArgumentParser):
+class _BeamArgumentParser(argparse.ArgumentParser):
"""An ArgumentParser that supports ValueProvider options.
Example Usage::
@@ -133,7 +147,7 @@ class PipelineOptions(HasDisplayData):
"""
self._flags = flags
self._all_options = kwargs
- parser = BeamArgumentParser()
+ parser = _BeamArgumentParser()
for cls in type(self).mro():
if cls == PipelineOptions:
@@ -187,7 +201,7 @@ class PipelineOptions(HasDisplayData):
# TODO(BEAM-1319): PipelineOption sub-classes in the main session might be
# repeated. Pick last unique instance of each subclass to avoid conflicts.
subset = {}
- parser = BeamArgumentParser()
+ parser = _BeamArgumentParser()
for cls in PipelineOptions.__subclasses__():
subset[str(cls)] = cls
for cls in subset.values():
http://git-wip-us.apache.org/repos/asf/beam/blob/6d77f958/sdks/python/apache_beam/options/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py b/sdks/python/apache_beam/options/pipeline_options_validator.py
index 5c1ce2a..24d2e55 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator.py
@@ -16,6 +16,8 @@
#
"""Pipeline options validator.
+
+For internal use only; no backwards-compatibility guarantees.
"""
import re
http://git-wip-us.apache.org/repos/asf/beam/blob/6d77f958/sdks/python/apache_beam/options/value_provider.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/value_provider.py b/sdks/python/apache_beam/options/value_provider.py
index c00d7bc..40bddba 100644
--- a/sdks/python/apache_beam/options/value_provider.py
+++ b/sdks/python/apache_beam/options/value_provider.py
@@ -24,6 +24,14 @@ from functools import wraps
from apache_beam import error
+__all__ = [
+ 'ValueProvider',
+ 'StaticValueProvider',
+ 'RuntimeValueProvider',
+ 'check_accessible',
+ ]
+
+
class ValueProvider(object):
def is_accessible(self):
raise NotImplementedError(
[04/19] beam git commit: [BEAM-1345] Clearly delineate public api in
runners package.
Posted by al...@apache.org.
[BEAM-1345] Clearly delineate public api in runners package.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aeeefc17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aeeefc17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aeeefc17
Branch: refs/heads/release-2.0.0
Commit: aeeefc1725bca11f765f57bf2833d606a0e6d6ba
Parents: 6d77f95
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 11 13:41:24 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/api/__init__.py | 4 +++-
sdks/python/apache_beam/runners/common.py | 5 ++++-
sdks/python/apache_beam/runners/dataflow/__init__.py | 9 +++++++++
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 3 +++
.../apache_beam/runners/dataflow/test_dataflow_runner.py | 3 +++
sdks/python/apache_beam/runners/direct/__init__.py | 6 +++++-
sdks/python/apache_beam/runners/direct/direct_runner.py | 3 +++
sdks/python/apache_beam/runners/pipeline_context.py | 6 ++++++
sdks/python/apache_beam/runners/portability/__init__.py | 2 ++
sdks/python/apache_beam/runners/runner.py | 3 +++
sdks/python/apache_beam/runners/worker/__init__.py | 2 ++
11 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/api/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/api/__init__.py b/sdks/python/apache_beam/runners/api/__init__.py
index e94673c..bf95208 100644
--- a/sdks/python/apache_beam/runners/api/__init__.py
+++ b/sdks/python/apache_beam/runners/api/__init__.py
@@ -15,7 +15,9 @@
# limitations under the License.
#
-"""Checked in to avoid protoc dependency for Python development.
+"""For internal use only; no backwards-compatibility guarantees.
+
+Checked in to avoid protoc dependency for Python development.
Regenerate files with::
http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 86db711..8453569 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -17,7 +17,10 @@
# cython: profile=True
-"""Worker operations executor."""
+"""Worker operations executor.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
import sys
import traceback
http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/dataflow/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/__init__.py b/sdks/python/apache_beam/runners/dataflow/__init__.py
index cce3aca..6674ba5 100644
--- a/sdks/python/apache_beam/runners/dataflow/__init__.py
+++ b/sdks/python/apache_beam/runners/dataflow/__init__.py
@@ -14,3 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
+"""The DataflowRunner executes pipelines on Google Cloud Dataflow.
+
+Anything in this package not imported here is an internal implementation detail
+with no backwards-compatibility guarantees.
+"""
+
+from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
+from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner
http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 796a67b..3d8437c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -46,6 +46,9 @@ from apache_beam.typehints import typehints
from apache_beam.options.pipeline_options import StandardOptions
+__all__ = ['DataflowRunner']
+
+
class DataflowRunner(PipelineRunner):
"""A runner that creates job graphs and submits them for remote execution.
http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 4fd1026..b339882 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -22,6 +22,9 @@ from apache_beam.options.pipeline_options import TestOptions, GoogleCloudOptions
from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
+__all__ = ['TestDataflowRunner']
+
+
class TestDataflowRunner(DataflowRunner):
def run(self, pipeline):
"""Execute test pipeline and verify test matcher"""
http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/direct/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/__init__.py b/sdks/python/apache_beam/runners/direct/__init__.py
index 0d64513..0f82756 100644
--- a/sdks/python/apache_beam/runners/direct/__init__.py
+++ b/sdks/python/apache_beam/runners/direct/__init__.py
@@ -15,5 +15,9 @@
# limitations under the License.
#
-"""Inprocess runner executes pipelines locally in a single process."""
+"""Inprocess runner executes pipelines locally in a single process.
+
+Anything in this package not imported here is an internal implementation detail
+with no backwards-compatibility guarantees.
+"""
from apache_beam.runners.direct.direct_runner import DirectRunner
http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 535aac3..ecf5114 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -36,6 +36,9 @@ from apache_beam.options.pipeline_options import DirectOptions
from apache_beam.options.value_provider import RuntimeValueProvider
+__all__ = ['DirectRunner']
+
+
class DirectRunner(PipelineRunner):
"""Executes a single pipeline on the local machine."""
http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index d3d3c24..1c89d06 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -15,6 +15,12 @@
# limitations under the License.
#
+"""Utility class for serializing pipelines via the runner API.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
+
+
from apache_beam import pipeline
from apache_beam import pvalue
from apache_beam import coders
http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/portability/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/__init__.py b/sdks/python/apache_beam/runners/portability/__init__.py
index cce3aca..7af93ed 100644
--- a/sdks/python/apache_beam/runners/portability/__init__.py
+++ b/sdks/python/apache_beam/runners/portability/__init__.py
@@ -14,3 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
+"""This runner is experimental; no backwards-compatibility guarantees."""
http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index d875fdc..af00d8f 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -26,6 +26,9 @@ import shutil
import tempfile
+__all__ = ['PipelineRunner', 'PipelineState', 'PipelineResult']
+
+
def _get_runner_map(runner_names, module_path):
"""Create a map of runner name in lower case to full import path to the
runner class.
http://git-wip-us.apache.org/repos/asf/beam/blob/aeeefc17/sdks/python/apache_beam/runners/worker/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/__init__.py b/sdks/python/apache_beam/runners/worker/__init__.py
index cce3aca..0bce5d6 100644
--- a/sdks/python/apache_beam/runners/worker/__init__.py
+++ b/sdks/python/apache_beam/runners/worker/__init__.py
@@ -14,3 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
+"""For internal use only; no backwards-compatibility guarantees."""
[15/19] beam git commit: [BEAM-1345] Annotate public members of
pvalue.
Posted by al...@apache.org.
[BEAM-1345] Annotate public members of pvalue.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0da682d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0da682d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0da682d
Branch: refs/heads/release-2.0.0
Commit: d0da682d5e981217c90571febdff728b8abbbc14
Parents: 2bb95fd
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 11 12:07:00 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:37 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pvalue.py | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d0da682d/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index fa91fe3..7385e82 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -31,6 +31,17 @@ import itertools
from apache_beam import typehints
+__all__ = [
+ 'PCollection',
+ 'TaggedOutput',
+ 'AsSingleton',
+ 'AsIter',
+ 'AsList',
+ 'AsDict',
+ 'EmptySideInput',
+]
+
+
class PValue(object):
"""Base class for PCollection.
[11/19] beam git commit: [BEAM-1345] Mark windowed value as
experimental
Posted by al...@apache.org.
[BEAM-1345] Mark windowed value as experimental
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5b095118
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5b095118
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5b095118
Branch: refs/heads/release-2.0.0
Commit: 5b09511873136cc6aff8c75f3767c2d81e288940
Parents: 9730f56
Author: Sourabh Bajaj <so...@google.com>
Authored: Thu May 11 11:58:08 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/utils/windowed_value.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5b095118/sdks/python/apache_beam/utils/windowed_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py
index 87c26d1..be27854 100644
--- a/sdks/python/apache_beam/utils/windowed_value.py
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -16,10 +16,12 @@
#
"""Core windowing data structures.
+
+This module is experimental. No backwards-compatibility guarantees.
"""
# This module is carefully crafted to have optimal performance when
-# compiled whiel still being valid Python. Care needs to be taken when
+# compiled while still being valid Python. Care needs to be taken when
# editing this file as WindowedValues are created for every element for
# every step in a Beam pipeline.
[06/19] beam git commit: [BEAM-1345] Clearly delineate public api in
apache_beam/coders.
Posted by al...@apache.org.
[BEAM-1345] Clearly delineate public api in apache_beam/coders.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7fd012b6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7fd012b6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7fd012b6
Branch: refs/heads/release-2.0.0
Commit: 7fd012b63f7ff8b10d80bce53a860ad7102673d6
Parents: 0ce2543
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed May 10 17:13:06 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 16:20:36 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.py | 2 ++
sdks/python/apache_beam/coders/coders.py | 32 +++++++++++++++++---
sdks/python/apache_beam/coders/coders_test.py | 11 ++++---
.../apache_beam/coders/coders_test_common.py | 2 +-
sdks/python/apache_beam/coders/observable.py | 5 ++-
sdks/python/apache_beam/coders/slow_stream.py | 5 ++-
.../apache_beam/coders/standard_coders_test.py | 2 +-
sdks/python/apache_beam/coders/stream.pyx | 5 +++
sdks/python/apache_beam/coders/typecoders.py | 3 ++
.../examples/snippets/snippets_test.py | 3 +-
sdks/python/apache_beam/io/fileio_test.py | 2 +-
sdks/python/apache_beam/io/textio.py | 2 +-
sdks/python/apache_beam/transforms/window.py | 2 +-
13 files changed, 59 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index a0496a2..10298bf 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -23,6 +23,8 @@ encode many elements with minimal overhead.
This module may be optionally compiled with Cython, using the corresponding
coder_impl.pxd file for type hints.
+
+For internal use only; no backwards-compatibility guarantees.
"""
from types import NoneType
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 4f75182..ce914dd 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -15,7 +15,10 @@
# limitations under the License.
#
-"""Collection of useful coders."""
+"""Collection of useful coders.
+
+Only those coders listed in __all__ are part of the public API of this module.
+"""
import base64
import cPickle as pickle
@@ -45,6 +48,13 @@ except ImportError:
import dill
+__all__ = ['Coder',
+ 'BytesCoder', 'DillCoder', 'FastPrimitivesCoder', 'FloatCoder',
+ 'IterableCoder', 'PickleCoder', 'ProtoCoder', 'SingletonCoder',
+ 'StrUtf8Coder', 'TimestampCoder', 'TupleCoder',
+ 'TupleSequenceCoder', 'VarIntCoder', 'WindowedValueCoder']
+
+
def serialize_coder(coder):
from apache_beam.internal import pickler
return '%s$%s' % (coder.__class__.__name__, pickler.dumps(coder))
@@ -116,6 +126,10 @@ class Coder(object):
self.estimate_size)
def get_impl(self):
+ """For internal use only; no backwards-compatibility guarantees.
+
+ Returns the CoderImpl backing this Coder.
+ """
if not hasattr(self, '_impl'):
self._impl = self._create_impl()
assert isinstance(self._impl, coder_impl.CoderImpl)
@@ -152,13 +166,17 @@ class Coder(object):
raise ValueError('Not a KV coder: %s.' % self)
def _get_component_coders(self):
- """Returns the internal component coders of this coder."""
+ """For internal use only; no backwards-compatibility guarantees.
+
+ Returns the internal component coders of this coder."""
# This is an internal detail of the Coder API and does not need to be
# refined in user-defined Coders.
return []
def as_cloud_object(self):
- """Returns Google Cloud Dataflow API description of this coder."""
+ """For internal use only; no backwards-compatibility guarantees.
+
+ Returns Google Cloud Dataflow API description of this coder."""
# This is an internal detail of the Coder API and does not need to be
# refined in user-defined Coders.
@@ -184,6 +202,8 @@ class Coder(object):
# pylint: enable=protected-access
def to_runner_api(self, context):
+ """For internal use only; no backwards-compatibility guarantees.
+ """
# TODO(BEAM-115): Use specialized URNs and components.
from apache_beam.runners.api import beam_runner_api_pb2
return beam_runner_api_pb2.Coder(
@@ -196,6 +216,8 @@ class Coder(object):
@staticmethod
def from_runner_api(proto, context):
+ """For internal use only; no backwards-compatibility guarantees.
+ """
any_proto = proto.spec.spec.parameter
bytes_proto = google.protobuf.wrappers_pb2.BytesValue()
any_proto.Unpack(bytes_proto)
@@ -779,7 +801,9 @@ class WindowedValueCoder(FastCoder):
class LengthPrefixCoder(FastCoder):
- """Coder which prefixes the length of the encoded object in the stream."""
+ """For internal use only; no backwards-compatibility guarantees.
+
+ Coder which prefixes the length of the encoded object in the stream."""
def __init__(self, value_coder):
self._value_coder = value_coder
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py
index 575503b..c89e810 100644
--- a/sdks/python/apache_beam/coders/coders_test.py
+++ b/sdks/python/apache_beam/coders/coders_test.py
@@ -20,8 +20,9 @@ import base64
import logging
import unittest
-from apache_beam import coders
+from apache_beam.coders import coders
from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
+from apache_beam.coders.typecoders import registry as coders_registry
class PickleCoderTest(unittest.TestCase):
@@ -46,13 +47,13 @@ class PickleCoderTest(unittest.TestCase):
class CodersTest(unittest.TestCase):
def test_str_utf8_coder(self):
- real_coder = coders.registry.get_coder(str)
+ real_coder = coders_registry.get_coder(str)
expected_coder = coders.BytesCoder()
self.assertEqual(
real_coder.encode('abc'), expected_coder.encode('abc'))
self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
- real_coder = coders.registry.get_coder(bytes)
+ real_coder = coders_registry.get_coder(bytes)
expected_coder = coders.BytesCoder()
self.assertEqual(
real_coder.encode('abc'), expected_coder.encode('abc'))
@@ -82,7 +83,7 @@ class ProtoCoderTest(unittest.TestCase):
mb.field1 = True
ma.field1 = u'hello world'
expected_coder = coders.ProtoCoder(ma.__class__)
- real_coder = coders.registry.get_coder(ma.__class__)
+ real_coder = coders_registry.get_coder(ma.__class__)
self.assertEqual(expected_coder, real_coder)
self.assertEqual(real_coder.encode(ma), expected_coder.encode(ma))
self.assertEqual(ma, real_coder.decode(real_coder.encode(ma)))
@@ -104,7 +105,7 @@ class FallbackCoderTest(unittest.TestCase):
def test_default_fallback_path(self):
"""Test fallback path picks a matching coder if no coder is registered."""
- coder = coders.registry.get_coder(DummyClass)
+ coder = coders_registry.get_coder(DummyClass)
# No matching coder, so picks the last fallback coder which is a
# FastPrimitivesCoder.
self.assertEqual(coder, coders.FastPrimitivesCoder())
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index e5bfe35..c9b67b3 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -23,12 +23,12 @@ import unittest
import dill
-import coders
import observable
from apache_beam.transforms import window
from apache_beam.utils import timestamp
from apache_beam.utils import windowed_value
+from apache_beam.coders import coders
from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/observable.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/observable.py b/sdks/python/apache_beam/coders/observable.py
index 5a808d8..fc952cf 100644
--- a/sdks/python/apache_beam/coders/observable.py
+++ b/sdks/python/apache_beam/coders/observable.py
@@ -16,7 +16,10 @@
#
-"""Observable base class for iterables."""
+"""Observable base class for iterables.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
class ObservableMixin(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/slow_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py
index 85837bc..1ab55d9 100644
--- a/sdks/python/apache_beam/coders/slow_stream.py
+++ b/sdks/python/apache_beam/coders/slow_stream.py
@@ -15,7 +15,10 @@
# limitations under the License.
#
-"""A pure Python implementation of stream.pyx."""
+"""A pure Python implementation of stream.pyx.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
import struct
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/standard_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 885e88f..5f98455 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -26,7 +26,7 @@ import unittest
import yaml
-from apache_beam import coders
+from apache_beam.coders import coders
from apache_beam.coders import coder_impl
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import Timestamp
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/stream.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx
index ae24418..8d97681 100644
--- a/sdks/python/apache_beam/coders/stream.pyx
+++ b/sdks/python/apache_beam/coders/stream.pyx
@@ -15,6 +15,11 @@
# limitations under the License.
#
+"""Compiled version of the Stream objects used by CoderImpl.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
+
cimport libc.stdlib
cimport libc.string
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/coders/typecoders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py
index 60832c9..3894bb5 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -71,6 +71,9 @@ from apache_beam.coders import coders
from apache_beam.typehints import typehints
+__all__ = ['registry']
+
+
class CoderRegistry(object):
"""A coder registry for typehint/coder associations."""
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 85d8bde..211da24 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -29,6 +29,7 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam import pvalue
from apache_beam import typehints
+from apache_beam.coders.coders import ToStringCoder
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
from apache_beam.options.pipeline_options import PipelineOptions
@@ -422,7 +423,7 @@ class SnippetsTest(unittest.TestCase):
def __init__(self, file_to_write):
self.file_to_write = file_to_write
self.file_obj = None
- self.coder = coders.ToStringCoder()
+ self.coder = ToStringCoder()
def start_bundle(self):
assert self.file_to_write
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 4c25505..b92b8be 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -29,7 +29,7 @@ import hamcrest as hc
import mock
import apache_beam as beam
-from apache_beam import coders
+from apache_beam.coders import coders
from apache_beam.io import fileio
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayData
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 750ec45..d43f4fc 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -21,7 +21,7 @@
from __future__ import absolute_import
import logging
-from apache_beam import coders
+from apache_beam.coders import coders
from apache_beam.io import filebasedsource
from apache_beam.io import fileio
from apache_beam.io import iobase
http://git-wip-us.apache.org/repos/asf/beam/blob/7fd012b6/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 29994c0..6d0db3a 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -53,7 +53,7 @@ import abc
from google.protobuf import struct_pb2
-from apache_beam import coders
+from apache_beam.coders import coders
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.transforms import timeutil
from apache_beam.utils import proto_utils