You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/11 22:44:01 UTC
[1/2] beam git commit: Remove some internal details from the public
API.
Repository: beam
Updated Branches:
refs/heads/master 1b31167fb -> fe625678f
Remove some internal details from the public API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98e685d8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98e685d8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98e685d8
Branch: refs/heads/master
Commit: 98e685d85a4cf75faee8156516d7d3aff60077f3
Parents: 1b31167
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed May 10 15:55:46 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 15:43:17 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 4 ++--
.../runners/dataflow/dataflow_runner.py | 6 +++---
.../runners/dataflow/dataflow_runner_test.py | 6 +++---
.../apache_beam/runners/direct/executor.py | 2 +-
.../runners/direct/transform_evaluator.py | 10 ++++-----
sdks/python/apache_beam/transforms/__init__.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 22 ++++++++++----------
.../python/apache_beam/transforms/ptransform.py | 6 +++---
.../apache_beam/transforms/ptransform_test.py | 12 +++++------
sdks/python/apache_beam/typehints/typecheck.py | 2 +-
10 files changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 79480d7..5048534 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -77,8 +77,8 @@ class Pipeline(object):
the PValues are the edges.
All the transforms applied to the pipeline must have distinct full labels.
- If same transform instance needs to be applied then a clone should be created
- with a new label (e.g., transform.clone('new label')).
+ If same transform instance needs to be applied then the right shift operator
+ should be used to designate new names (e.g. `input | "label" >> my_tranform`).
"""
def __init__(self, runner=None, options=None, argv=None):
http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index da8de9d..0ecd22a 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -160,7 +160,7 @@ class DataflowRunner(PipelineRunner):
class GroupByKeyInputVisitor(PipelineVisitor):
"""A visitor that replaces `Any` element type for input `PCollection` of
- a `GroupByKey` or `GroupByKeyOnly` with a `KV` type.
+ a `GroupByKey` or `_GroupByKeyOnly` with a `KV` type.
TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,
we could directly replace the coder instead of mutating the element type.
@@ -169,8 +169,8 @@ class DataflowRunner(PipelineRunner):
def visit_transform(self, transform_node):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.transforms.core import GroupByKey, GroupByKeyOnly
- if isinstance(transform_node.transform, (GroupByKey, GroupByKeyOnly)):
+ from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly
+ if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)):
pcoll = transform_node.inputs[0]
input_type = pcoll.element_type
# If input_type is not specified, then treat it as `Any`.
http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index ac9b028..ff4b51d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -37,7 +37,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeExceptio
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.transforms.core import GroupByKeyOnly
+from apache_beam.transforms.core import _GroupByKeyOnly
from apache_beam.typehints import typehints
# Protect against environments where apitools library is not available.
@@ -185,7 +185,7 @@ class DataflowRunnerTest(unittest.TestCase):
pcoll1 = PCollection(p)
pcoll2 = PCollection(p)
pcoll3 = PCollection(p)
- for transform in [GroupByKeyOnly(), beam.GroupByKey()]:
+ for transform in [_GroupByKeyOnly(), beam.GroupByKey()]:
pcoll1.element_type = None
pcoll2.element_type = typehints.Any
pcoll3.element_type = typehints.KV[typehints.Any, typehints.Any]
@@ -199,7 +199,7 @@ class DataflowRunnerTest(unittest.TestCase):
p = TestPipeline()
pcoll1 = PCollection(p)
pcoll2 = PCollection(p)
- for transform in [GroupByKeyOnly(), beam.GroupByKey()]:
+ for transform in [_GroupByKeyOnly(), beam.GroupByKey()]:
pcoll1.element_type = typehints.TupleSequenceConstraint
pcoll2.element_type = typehints.Set
err_msg = "Input to GroupByKey must be of Tuple or Any type"
http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 9efbede..86db291 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -155,7 +155,7 @@ class _SerialEvaluationState(_TransformEvaluationState):
item of work will be submitted to the ExecutorService at any time.
A principal use of this is for evaluators that keeps a global state such as
- GroupByKeyOnly.
+ _GroupByKeyOnly.
"""
def __init__(self, executor_service, scheduled):
http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 6984ded..b1cb626 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -53,7 +53,7 @@ class TransformEvaluatorRegistry(object):
io.Read: _BoundedReadEvaluator,
core.Flatten: _FlattenEvaluator,
core.ParDo: _ParDoEvaluator,
- core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
+ core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
_NativeWrite: _NativeWriteEvaluator,
}
@@ -83,7 +83,7 @@ class TransformEvaluatorRegistry(object):
"""Returns True if this applied_ptransform should run one bundle at a time.
Some TransformEvaluators use a global state object to keep track of their
- global execution state. For example evaluator for GroupByKeyOnly uses this
+ global execution state. For example evaluator for _GroupByKeyOnly uses this
state as an in memory dictionary to buffer keys.
Serially executed evaluators will act as syncing point in the graph and
@@ -99,7 +99,7 @@ class TransformEvaluatorRegistry(object):
True if executor should execute applied_ptransform serially.
"""
return isinstance(applied_ptransform.transform,
- (core.GroupByKeyOnly, _NativeWrite))
+ (core._GroupByKeyOnly, _NativeWrite))
class _TransformEvaluator(object):
@@ -325,7 +325,7 @@ class _ParDoEvaluator(_TransformEvaluator):
class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
- """TransformEvaluator for GroupByKeyOnly transform."""
+ """TransformEvaluator for _GroupByKeyOnly transform."""
MAX_ELEMENT_PER_BUNDLE = None
@@ -369,7 +369,7 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
k, v = element.value
self.state.output[self.key_coder.encode(k)].append(v)
else:
- raise TypeCheckError('Input to GroupByKeyOnly must be a PCollection of '
+ raise TypeCheckError('Input to _GroupByKeyOnly must be a PCollection of '
'windowed key-value pairs. Instead received: %r.'
% element)
http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/transforms/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py
index 847fb8f..b77b0f6 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -21,5 +21,5 @@
from apache_beam.transforms import combiners
from apache_beam.transforms.core import *
from apache_beam.transforms.ptransform import *
-from apache_beam.transforms.timeutil import *
+from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.util import *
http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index abe699f..0e497f9 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -215,7 +215,7 @@ class DoFn(WithTypeHints, HasDisplayData):
return Any
return type_hint
- def process_argspec_fn(self):
+ def _process_argspec_fn(self):
"""Returns the Python callable that will eventually be invoked.
This should ideally be the user-level function that is called with
@@ -307,7 +307,7 @@ class CallableWrapperDoFn(DoFn):
return self._strip_output_annotations(
trivial_inference.infer_return_type(self._fn, [input_type]))
- def process_argspec_fn(self):
+ def _process_argspec_fn(self):
return getattr(self._fn, '_argspec_fn', self._fn)
@@ -641,8 +641,8 @@ class ParDo(PTransformWithSideInputs):
return fn
return CallableWrapperDoFn(fn)
- def process_argspec_fn(self):
- return self.fn.process_argspec_fn()
+ def _process_argspec_fn(self):
+ return self.fn._process_argspec_fn()
def display_data(self):
return {'fn': DisplayDataItem(self.fn.__class__,
@@ -870,19 +870,19 @@ class CombineGlobally(PTransform):
def default_label(self):
return 'CombineGlobally(%s)' % ptransform.label_from_callable(self.fn)
- def clone(self, **extra_attributes):
+ def _clone(self, **extra_attributes):
clone = copy.copy(self)
clone.__dict__.update(extra_attributes)
return clone
def with_defaults(self, has_defaults=True):
- return self.clone(has_defaults=has_defaults)
+ return self._clone(has_defaults=has_defaults)
def without_defaults(self):
return self.with_defaults(False)
def as_singleton_view(self):
- return self.clone(as_view=True)
+ return self._clone(as_view=True)
def expand(self, pcoll):
def add_input_types(transform):
@@ -964,7 +964,7 @@ class CombinePerKey(PTransformWithSideInputs):
def default_label(self):
return '%s(%s)' % (self.__class__.__name__, self._fn_label)
- def process_argspec_fn(self):
+ def _process_argspec_fn(self):
return self.fn._fn # pylint: disable=protected-access
def expand(self, pcoll):
@@ -1133,7 +1133,7 @@ class GroupByKey(PTransform):
return (pcoll
| 'ReifyWindows' >> (ParDo(self.ReifyWindows())
.with_output_types(reify_output_type))
- | 'GroupByKey' >> (GroupByKeyOnly()
+ | 'GroupByKey' >> (_GroupByKeyOnly()
.with_input_types(reify_output_type)
.with_output_types(gbk_input_type))
| ('GroupByWindow' >> ParDo(
@@ -1144,14 +1144,14 @@ class GroupByKey(PTransform):
# The input_type is None, run the default
return (pcoll
| 'ReifyWindows' >> ParDo(self.ReifyWindows())
- | 'GroupByKey' >> GroupByKeyOnly()
+ | 'GroupByKey' >> _GroupByKeyOnly()
| 'GroupByWindow' >> ParDo(
self.GroupAlsoByWindow(pcoll.windowing)))
@typehints.with_input_types(typehints.KV[K, V])
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
-class GroupByKeyOnly(PTransform):
+class _GroupByKeyOnly(PTransform):
"""A group by key transform, ignoring windows."""
def infer_output_type(self, input_type):
key_type, value_type = trivial_inference.key_value_types(input_type)
http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 8898c36..bd2a120 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -303,7 +303,7 @@ class PTransform(WithTypeHints, HasDisplayData):
# TODO(ccy): further refine this API.
return None
- def clone(self, new_label):
+ def _clone(self, new_label):
"""Clones the current transform instance under a new label."""
transform = copy.copy(self)
transform.label = new_label
@@ -567,7 +567,7 @@ class PTransformWithSideInputs(PTransform):
arg_types = [pvalueish.element_type] + [element_type(v) for v in args]
kwargs_types = {k: element_type(v) for (k, v) in kwargs.items()}
- argspec_fn = self.process_argspec_fn()
+ argspec_fn = self._process_argspec_fn()
bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types)
hints = getcallargs_forhints(argspec_fn, *type_hints[0], **type_hints[1])
for arg, hint in hints.items():
@@ -581,7 +581,7 @@ class PTransformWithSideInputs(PTransform):
'Type hint violation for \'%s\': requires %s but got %s for %s'
% (self.label, hint, bindings[arg], arg))
- def process_argspec_fn(self):
+ def _process_argspec_fn(self):
"""Returns an argspec of the function actually consuming the data.
"""
raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index f790660..efc5978 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -35,7 +35,7 @@ import apache_beam.pvalue as pvalue
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import window
-from apache_beam.transforms.core import GroupByKeyOnly
+from apache_beam.transforms.core import _GroupByKeyOnly
import apache_beam.transforms.combiners as combine
from apache_beam.transforms.display import DisplayData, DisplayDataItem
from apache_beam.transforms.ptransform import PTransform
@@ -580,7 +580,7 @@ class PTransformTest(unittest.TestCase):
pipeline = TestPipeline()
pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
with self.assertRaises(typehints.TypeCheckError) as cm:
- pcolls | 'D' >> GroupByKeyOnly()
+ pcolls | 'D' >> _GroupByKeyOnly()
pipeline.run()
expected_error_prefix = ('Input type hint violation at D: expected '
@@ -1088,7 +1088,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
| ('Pair' >> beam.Map(lambda x: (x, ord(x)))
.with_output_types(typehints.KV[str, str]))
- | GroupByKeyOnly())
+ | _GroupByKeyOnly())
# Output type should correctly be deduced.
# GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -1112,7 +1112,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create([1, 2, 3]).with_output_types(int)
- | 'F' >> GroupByKeyOnly())
+ | 'F' >> _GroupByKeyOnly())
self.assertEqual("Input type hint violation at F: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1155,7 +1155,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
(self.p
| 'Nums' >> beam.Create(range(5)).with_output_types(int)
| 'ModDup' >> beam.Map(lambda x: (x % 2, x))
- | GroupByKeyOnly())
+ | _GroupByKeyOnly())
self.assertEqual('Pipeline type checking is enabled, however no output '
'type-hint was found for the PTransform '
@@ -1978,7 +1978,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_gbk_type_inference(self):
self.assertEqual(
typehints.Tuple[str, typehints.Iterable[int]],
- GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
+ _GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
def test_pipeline_inference(self):
created = self.p | beam.Create(['a', 'b', 'c'])
http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py
index 09b73f9..89a5f5c 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -109,7 +109,7 @@ class TypeCheckWrapperDoFn(AbstractDoFnWrapper):
def __init__(self, dofn, type_hints, label=None):
super(TypeCheckWrapperDoFn, self).__init__(dofn)
self.dofn = dofn
- self._process_fn = self.dofn.process_argspec_fn()
+ self._process_fn = self.dofn._process_argspec_fn()
if type_hints.input_types:
input_args, input_kwargs = type_hints.input_types
self._input_hints = getcallargs_forhints(
[2/2] beam git commit: This closes #3065
Posted by al...@apache.org.
This closes #3065
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe625678
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe625678
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe625678
Branch: refs/heads/master
Commit: fe625678f55d9c9cc288b644749d7916015cf080
Parents: 1b31167 98e685d
Author: Ahmet Altay <al...@google.com>
Authored: Thu May 11 15:43:21 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu May 11 15:43:21 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 4 ++--
.../runners/dataflow/dataflow_runner.py | 6 +++---
.../runners/dataflow/dataflow_runner_test.py | 6 +++---
.../apache_beam/runners/direct/executor.py | 2 +-
.../runners/direct/transform_evaluator.py | 10 ++++-----
sdks/python/apache_beam/transforms/__init__.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 22 ++++++++++----------
.../python/apache_beam/transforms/ptransform.py | 6 +++---
.../apache_beam/transforms/ptransform_test.py | 12 +++++------
sdks/python/apache_beam/typehints/typecheck.py | 2 +-
10 files changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------