You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/23 23:47:07 UTC
[01/12] incubator-beam git commit: Fix error messages for externally
named PTransforms.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 9fe102a5c -> 38d9dea2e
Fix error messages for externally named PTransforms.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c186ce4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c186ce4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c186ce4
Branch: refs/heads/python-sdk
Commit: 7c186ce4ce18c98cf3979b6f0f07c72a47b42ec9
Parents: 031c4cc
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 15:55:26 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:45 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c186ce4/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 30ad315..aeed9f9 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -201,6 +201,15 @@ class Pipeline(object):
if not isinstance(transform, ptransform.PTransform):
raise TypeError("Expected a PTransform object, got %s" % transform)
+ if label:
+ # Fix self.label as it is inspected by some PTransform operations
+ # (e.g. to produce error messages for type hint violations).
+ try:
+ old_label, transform.label = transform.label, label
+ return self.apply(transform, pvalueish)
+ finally:
+ transform.label = old_label
+
full_label = '/'.join([self._current_transform().full_label,
label or transform.label]).lstrip('/')
if full_label in self.applied_labels:
[04/12] incubator-beam git commit: Fix multi-input named PTransforms.
Posted by ro...@apache.org.
Fix multi-input named PTransforms.
Now delegate the __ror__ logic entirely for the naming wrapper.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937cf69e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937cf69e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937cf69e
Branch: refs/heads/python-sdk
Commit: 937cf69e958d4a82fb274f311de248930298db69
Parents: 9fe102a
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 14:32:33 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:45 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/ptransform.py | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937cf69e/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index da8b671..b652bca 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -400,7 +400,7 @@ class PTransform(WithTypeHints):
else:
return NotImplemented
- def __ror__(self, left):
+ def __ror__(self, left, label=None):
"""Used to apply this PTransform to non-PValues, e.g., a tuple."""
pvalueish, pvalues = self._extract_input_pvalues(left)
pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)]
@@ -434,7 +434,7 @@ class PTransform(WithTypeHints):
if not isinstance(v, pvalue.PValue) and v is not None}
pvalueish = _SetInputPValues().visit(pvalueish, replacements)
self.pipeline = p
- result = p.apply(self, pvalueish)
+ result = p.apply(self, pvalueish, label)
if deferred:
return result
else:
@@ -720,5 +720,8 @@ class _NamedPTransform(PTransform):
super(_NamedPTransform, self).__init__(label)
self.transform = transform
+ def __ror__(self, pvalueish):
+ return self.transform.__ror__(pvalueish, self.label)
+
def apply(self, pvalue):
raise RuntimeError("Should never be applied directly.")
[02/12] incubator-beam git commit: Move names out of transform
constructors.
Posted by ro...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index a747112..0439fe1 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -39,16 +39,16 @@ class CombineTest(unittest.TestCase):
size = len(vals)
# First for global combines.
- pcoll = pipeline | Create('start', vals)
- result_mean = pcoll | combine.Mean.Globally('mean')
- result_count = pcoll | combine.Count.Globally('count')
+ pcoll = pipeline | 'start' >> Create(vals)
+ result_mean = pcoll | 'mean' >> combine.Mean.Globally()
+ result_count = pcoll | 'count' >> combine.Count.Globally()
assert_that(result_mean, equal_to([mean]), label='assert:mean')
assert_that(result_count, equal_to([size]), label='assert:size')
# Again for per-key combines.
- pcoll = pipeline | Create('start-perkey', [('a', x) for x in vals])
- result_key_mean = pcoll | combine.Mean.PerKey('mean-perkey')
- result_key_count = pcoll | combine.Count.PerKey('count-perkey')
+ pcoll = pipeline | 'start-perkey' >> Create([('a', x) for x in vals])
+ result_key_mean = pcoll | 'mean-perkey' >> combine.Mean.PerKey()
+ result_key_count = pcoll | 'count-perkey' >> combine.Count.PerKey()
assert_that(result_key_mean, equal_to([('a', mean)]), label='key:mean')
assert_that(result_key_count, equal_to([('a', size)]), label='key:size')
pipeline.run()
@@ -66,9 +66,9 @@ class CombineTest(unittest.TestCase):
9: 'nniiinne'}
# First for global combines.
- pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
- result_top = pcoll | combine.Top.Largest('top', 5)
- result_bot = pcoll | combine.Top.Smallest('bot', 4)
+ pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
+ result_top = pcoll | 'top' >> combine.Top.Largest(5)
+ result_bot = pcoll | 'bot' >> combine.Top.Smallest(4)
result_cmp = pcoll | combine.Top.Of(
'cmp',
6,
@@ -81,8 +81,8 @@ class CombineTest(unittest.TestCase):
# Again for per-key combines.
pcoll = pipeline | Create(
'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
- result_key_top = pcoll | combine.Top.LargestPerKey('top-perkey', 5)
- result_key_bot = pcoll | combine.Top.SmallestPerKey('bot-perkey', 4)
+ result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5)
+ result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4)
result_key_cmp = pcoll | combine.Top.PerKey(
'cmp-perkey',
6,
@@ -99,15 +99,15 @@ class CombineTest(unittest.TestCase):
def test_top_shorthands(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
- result_top = pcoll | beam.CombineGlobally('top', combine.Largest(5))
- result_bot = pcoll | beam.CombineGlobally('bot', combine.Smallest(4))
+ pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
+ result_top = pcoll | 'top' >> beam.CombineGlobally(combine.Largest(5))
+ result_bot = pcoll | 'bot' >> beam.CombineGlobally(combine.Smallest(4))
assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')
pcoll = pipeline | Create(
'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
- result_ktop = pcoll | beam.CombinePerKey('top-perkey', combine.Largest(5))
+ result_ktop = pcoll | 'top-perkey' >> beam.CombinePerKey(combine.Largest(5))
result_kbot = pcoll | beam.CombinePerKey(
'bot-perkey', combine.Smallest(4))
assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top')
@@ -119,7 +119,7 @@ class CombineTest(unittest.TestCase):
# First test global samples (lots of them).
for ix in xrange(300):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | Create('start', [1, 1, 2, 2])
+ pcoll = pipeline | 'start' >> Create([1, 1, 2, 2])
result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3)
def matcher():
@@ -141,7 +141,7 @@ class CombineTest(unittest.TestCase):
pcoll = pipeline | Create(
'start-perkey',
sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
- result = pcoll | combine.Sample.FixedSizePerKey('sample', 3)
+ result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)
def matcher():
def match(actual):
@@ -158,7 +158,7 @@ class CombineTest(unittest.TestCase):
p = Pipeline('DirectPipelineRunner')
result = (
p
- | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
+ | 'a' >> Create([(100, 0.0), ('b', 10, -1), ('c', 1, 100)])
| beam.CombineGlobally(combine.TupleCombineFn(max,
combine.MeanCombineFn(),
sum)).without_defaults())
@@ -179,8 +179,8 @@ class CombineTest(unittest.TestCase):
def test_to_list_and_to_dict(self):
pipeline = Pipeline('DirectPipelineRunner')
the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
- pcoll = pipeline | Create('start', the_list)
- result = pcoll | combine.ToList('to list')
+ pcoll = pipeline | 'start' >> Create(the_list)
+ result = pcoll | 'to list' >> combine.ToList()
def matcher(expected):
def match(actual):
@@ -191,8 +191,8 @@ class CombineTest(unittest.TestCase):
pipeline = Pipeline('DirectPipelineRunner')
pairs = [(1, 2), (3, 4), (5, 6)]
- pcoll = pipeline | Create('start-pairs', pairs)
- result = pcoll | combine.ToDict('to dict')
+ pcoll = pipeline | 'start-pairs' >> Create(pairs)
+ result = pcoll | 'to dict' >> combine.ToDict()
def matcher():
def match(actual):
@@ -221,8 +221,8 @@ class CombineTest(unittest.TestCase):
return main | Map(lambda _, s: s, side)
p = Pipeline('DirectPipelineRunner')
- result1 = p | Create('label1', []) | CombineWithSideInput('L1')
- result2 = p | Create('label2', [1, 2, 3, 4]) | CombineWithSideInput('L2')
+ result1 = p | 'label1' >> Create([]) | 'L1' >> CombineWithSideInput()
+ result2 = p | 'label2' >> Create([1, 2, 3, 4]) | 'L2' >> CombineWithSideInput()
assert_that(result1, equal_to([0]), label='r1')
assert_that(result2, equal_to([10]), label='r2')
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index b288321..44a6d29 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -814,12 +814,12 @@ class CombineGlobally(PTransform):
return transform
combined = (pcoll
- | add_input_types(Map('KeyWithVoid', lambda v: (None, v))
+ | 'KeyWithVoid' >> add_input_types(Map(lambda v: (None, v))
.with_output_types(
KV[None, pcoll.element_type]))
| CombinePerKey(
'CombinePerKey', self.fn, *self.args, **self.kwargs)
- | Map('UnKey', lambda (k, v): v))
+ | 'UnKey' >> Map(lambda (k, v): v))
if not self.has_defaults and not self.as_view:
return combined
@@ -851,8 +851,8 @@ class CombineGlobally(PTransform):
else:
return transform
return (pcoll.pipeline
- | Create('DoOnce', [None])
- | typed(Map('InjectDefault', lambda _, s: s, view)))
+ | 'DoOnce' >> Create([None])
+ | 'InjectDefault' >> typed(Map(lambda _, s: s, view)))
class CombinePerKey(PTransformWithSideInputs):
@@ -1045,9 +1045,9 @@ class GroupByKey(PTransform):
gbk_output_type = KV[key_type, Iterable[value_type]]
return (pcoll
- | (ParDo('reify_windows', self.ReifyWindows())
+ | 'reify_windows' >> (ParDo(self.ReifyWindows())
.with_output_types(reify_output_type))
- | (GroupByKeyOnly('group_by_key')
+ | 'group_by_key' >> (GroupByKeyOnly()
.with_input_types(reify_output_type)
.with_output_types(gbk_input_type))
| (ParDo('group_by_window',
@@ -1056,8 +1056,8 @@ class GroupByKey(PTransform):
.with_output_types(gbk_output_type)))
else:
return (pcoll
- | ParDo('reify_windows', self.ReifyWindows())
- | GroupByKeyOnly('group_by_key')
+ | 'reify_windows' >> ParDo(self.ReifyWindows())
+ | 'group_by_key' >> GroupByKeyOnly()
| ParDo('group_by_window',
self.GroupAlsoByWindow(pcoll.windowing)))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index b652bca..6eb28b0 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -691,7 +691,7 @@ def ptransform_fn(fn):
With either method the custom PTransform can be used in pipelines as if
it were one of the "native" PTransforms::
- result_pcoll = input_pcoll | CustomMapper('label', somefn)
+ result_pcoll = input_pcoll | 'label' >> CustomMapper(somefn)
Note that for both solutions the underlying implementation of the pipe
operator (i.e., `|`) will inject the pcoll argument in its proper place
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index feb081c..8121c1e 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -53,12 +53,12 @@ class PTransformTest(unittest.TestCase):
str(PTransform()))
pa = Pipeline('DirectPipelineRunner')
- res = pa | beam.Create('a_label', [1, 2])
+ res = pa | 'a_label' >> beam.Create([1, 2])
self.assertEqual('<Create(PTransform) label=[a_label]>',
str(res.producer.transform))
pc = Pipeline('DirectPipelineRunner')
- res = pc | beam.Create('with_inputs', [1, 2])
+ res = pc | 'with_inputs' >> beam.Create([1, 2])
inputs_tr = res.producer.transform
inputs_tr.inputs = ('ci',)
self.assertEqual(
@@ -66,7 +66,7 @@ class PTransformTest(unittest.TestCase):
str(inputs_tr))
pd = Pipeline('DirectPipelineRunner')
- res = pd | beam.Create('with_sidei', [1, 2])
+ res = pd | 'with_sidei' >> beam.Create([1, 2])
side_tr = res.producer.transform
side_tr.side_inputs = (4,)
self.assertEqual(
@@ -110,8 +110,8 @@ class PTransformTest(unittest.TestCase):
return [context.element + addon]
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', [1, 2, 3])
- result = pcoll | beam.ParDo('do', AddNDoFn(), 10)
+ pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+ result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10)
assert_that(result, equal_to([11, 12, 13]))
pipeline.run()
@@ -122,21 +122,21 @@ class PTransformTest(unittest.TestCase):
pass
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', [1, 2, 3])
+ pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
with self.assertRaises(ValueError):
- pcoll | beam.ParDo('do', MyDoFn) # Note the lack of ()'s
+ pcoll | 'do' >> beam.ParDo(MyDoFn) # Note the lack of ()'s
def test_do_with_callable(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', [1, 2, 3])
- result = pcoll | beam.FlatMap('do', lambda x, addon: [x + addon], 10)
+ pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+ result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10)
assert_that(result, equal_to([11, 12, 13]))
pipeline.run()
def test_do_with_side_input_as_arg(self):
pipeline = Pipeline('DirectPipelineRunner')
- side = pipeline | beam.Create('side', [10])
- pcoll = pipeline | beam.Create('start', [1, 2, 3])
+ side = pipeline | 'side' >> beam.Create([10])
+ pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
result = pcoll | beam.FlatMap(
'do', lambda x, addon: [x + addon], pvalue.AsSingleton(side))
assert_that(result, equal_to([11, 12, 13]))
@@ -144,8 +144,8 @@ class PTransformTest(unittest.TestCase):
def test_do_with_side_input_as_keyword_arg(self):
pipeline = Pipeline('DirectPipelineRunner')
- side = pipeline | beam.Create('side', [10])
- pcoll = pipeline | beam.Create('start', [1, 2, 3])
+ side = pipeline | 'side' >> beam.Create([10])
+ pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
result = pcoll | beam.FlatMap(
'do', lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side))
assert_that(result, equal_to([11, 12, 13]))
@@ -153,8 +153,8 @@ class PTransformTest(unittest.TestCase):
def test_do_with_do_fn_returning_string_raises_warning(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', ['2', '9', '3'])
- pcoll | beam.FlatMap('do', lambda x: x + '1')
+ pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
+ pcoll | 'do' >> beam.FlatMap(lambda x: x + '1')
# Since the DoFn directly returns a string we should get an error warning
# us.
@@ -167,8 +167,8 @@ class PTransformTest(unittest.TestCase):
def test_do_with_do_fn_returning_dict_raises_warning(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', ['2', '9', '3'])
- pcoll | beam.FlatMap('do', lambda x: {x: '1'})
+ pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
+ pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'})
# Since the DoFn directly returns a dict we should get an error warning
# us.
@@ -181,9 +181,9 @@ class PTransformTest(unittest.TestCase):
def test_do_with_side_outputs_maintains_unique_name(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', [1, 2, 3])
- r1 = pcoll | beam.FlatMap('a', lambda x: [x + 1]).with_outputs(main='m')
- r2 = pcoll | beam.FlatMap('b', lambda x: [x + 2]).with_outputs(main='m')
+ pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+ r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
+ r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m')
assert_that(r1.m, equal_to([2, 3, 4]), label='r1')
assert_that(r2.m, equal_to([3, 4, 5]), label='r2')
pipeline.run()
@@ -194,8 +194,8 @@ class PTransformTest(unittest.TestCase):
def incorrect_par_do_fn(x):
return x + 5
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', [2, 9, 3])
- pcoll | beam.FlatMap('do', incorrect_par_do_fn)
+ pcoll = pipeline | 'start' >> beam.Create([2, 9, 3])
+ pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn)
# It's a requirement that all user-defined functions to a ParDo return
# an iterable.
with self.assertRaises(typehints.TypeCheckError) as cm:
@@ -215,8 +215,8 @@ class PTransformTest(unittest.TestCase):
def finish_bundle(self, c):
yield 'finish'
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', [1, 2, 3])
- result = pcoll | beam.ParDo('do', MyDoFn())
+ pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+ result = pcoll | 'do' >> beam.ParDo(MyDoFn())
# May have many bundles, but each has a start and finish.
def matcher():
@@ -230,7 +230,7 @@ class PTransformTest(unittest.TestCase):
def test_filter(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', [1, 2, 3, 4])
+ pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4])
result = pcoll | beam.Filter(
'filter', lambda x: x % 2 == 0)
assert_that(result, equal_to([2, 4]))
@@ -256,15 +256,15 @@ class PTransformTest(unittest.TestCase):
def test_combine_with_combine_fn(self):
vals = [1, 2, 3, 4, 5, 6, 7]
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', vals)
- result = pcoll | beam.CombineGlobally('mean', self._MeanCombineFn())
+ pcoll = pipeline | 'start' >> beam.Create(vals)
+ result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn())
assert_that(result, equal_to([sum(vals) / len(vals)]))
pipeline.run()
def test_combine_with_callable(self):
vals = [1, 2, 3, 4, 5, 6, 7]
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', vals)
+ pcoll = pipeline | 'start' >> beam.Create(vals)
result = pcoll | beam.CombineGlobally(sum)
assert_that(result, equal_to([sum(vals)]))
pipeline.run()
@@ -272,8 +272,8 @@ class PTransformTest(unittest.TestCase):
def test_combine_with_side_input_as_arg(self):
values = [1, 2, 3, 4, 5, 6, 7]
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', values)
- divisor = pipeline | beam.Create('divisor', [2])
+ pcoll = pipeline | 'start' >> beam.Create(values)
+ divisor = pipeline | 'divisor' >> beam.Create([2])
result = pcoll | beam.CombineGlobally(
'max',
# Multiples of divisor only.
@@ -287,9 +287,9 @@ class PTransformTest(unittest.TestCase):
vals_1 = [1, 2, 3, 4, 5, 6, 7]
vals_2 = [2, 4, 6, 8, 10, 12, 14]
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', ([('a', x) for x in vals_1] +
+ pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
[('b', x) for x in vals_2]))
- result = pcoll | beam.CombinePerKey('mean', self._MeanCombineFn())
+ result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
('b', sum(vals_2) / len(vals_2))]))
pipeline.run()
@@ -298,7 +298,7 @@ class PTransformTest(unittest.TestCase):
vals_1 = [1, 2, 3, 4, 5, 6, 7]
vals_2 = [2, 4, 6, 8, 10, 12, 14]
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', ([('a', x) for x in vals_1] +
+ pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
[('b', x) for x in vals_2]))
result = pcoll | beam.CombinePerKey(sum)
assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))]))
@@ -308,9 +308,9 @@ class PTransformTest(unittest.TestCase):
vals_1 = [1, 2, 3, 4, 5, 6, 7]
vals_2 = [2, 4, 6, 8, 10, 12, 14]
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', ([('a', x) for x in vals_1] +
+ pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
[('b', x) for x in vals_2]))
- divisor = pipeline | beam.Create('divisor', [2])
+ divisor = pipeline | 'divisor' >> beam.Create([2])
result = pcoll | beam.CombinePerKey(
lambda vals, d: max(v for v in vals if v % d == 0),
pvalue.AsSingleton(divisor)) # Multiples of divisor only.
@@ -323,7 +323,7 @@ class PTransformTest(unittest.TestCase):
pipeline = Pipeline('DirectPipelineRunner')
pcoll = pipeline | beam.Create(
'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
- result = pcoll | beam.GroupByKey('group')
+ result = pcoll | 'group' >> beam.GroupByKey()
assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
pipeline.run()
@@ -335,9 +335,9 @@ class PTransformTest(unittest.TestCase):
return (context.element % 3) + offset
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
+ pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
# Attempt nominal partition operation.
- partitions = pcoll | beam.Partition('part1', SomePartitionFn(), 4, 1)
+ partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1)
assert_that(partitions[0], equal_to([]))
assert_that(partitions[1], equal_to([0, 3, 6]), label='p1')
assert_that(partitions[2], equal_to([1, 4, 7]), label='p2')
@@ -347,14 +347,14 @@ class PTransformTest(unittest.TestCase):
# Check that a bad partition label will yield an error. For the
# DirectPipelineRunner, this error manifests as an exception.
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
- partitions = pcoll | beam.Partition('part2', SomePartitionFn(), 4, 10000)
+ pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
+ partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000)
with self.assertRaises(ValueError):
pipeline.run()
def test_partition_with_callable(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
+ pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
partitions = (
pcoll | beam.Partition(
'part',
@@ -370,49 +370,49 @@ class PTransformTest(unittest.TestCase):
"""Regression test for an issue with how partitions are handled."""
pipeline = Pipeline('DirectPipelineRunner')
contents = [('aa', 1), ('bb', 2), ('aa', 2)]
- created = pipeline | beam.Create('A', contents)
- partitioned = created | beam.Partition('B', lambda x, n: len(x) % n, 3)
- flattened = partitioned | beam.Flatten('C')
- grouped = flattened | beam.GroupByKey('D')
+ created = pipeline | 'A' >> beam.Create(contents)
+ partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
+ flattened = partitioned | 'C' >> beam.Flatten()
+ grouped = flattened | 'D' >> beam.GroupByKey()
assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))
pipeline.run()
def test_flatten_pcollections(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll_1 = pipeline | beam.Create('start_1', [0, 1, 2, 3])
- pcoll_2 = pipeline | beam.Create('start_2', [4, 5, 6, 7])
- result = (pcoll_1, pcoll_2) | beam.Flatten('flatten')
+ pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
+ pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
+ result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten()
assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
pipeline.run()
def test_flatten_no_pcollections(self):
pipeline = Pipeline('DirectPipelineRunner')
with self.assertRaises(ValueError):
- () | beam.Flatten('pipeline arg missing')
- result = () | beam.Flatten('empty', pipeline=pipeline)
+ () | 'pipeline arg missing' >> beam.Flatten()
+ result = () | 'empty' >> beam.Flatten(pipeline=pipeline)
assert_that(result, equal_to([]))
pipeline.run()
def test_flatten_pcollections_in_iterable(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll_1 = pipeline | beam.Create('start_1', [0, 1, 2, 3])
- pcoll_2 = pipeline | beam.Create('start_2', [4, 5, 6, 7])
+ pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
+ pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
result = ([pcoll for pcoll in (pcoll_1, pcoll_2)]
- | beam.Flatten('flatten'))
+ | 'flatten' >> beam.Flatten())
assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
pipeline.run()
def test_flatten_input_type_must_be_iterable(self):
# Inputs to flatten *must* be an iterable.
with self.assertRaises(ValueError):
- 4 | beam.Flatten('flatten')
+ 4 | 'flatten' >> beam.Flatten()
def test_flatten_input_type_must_be_iterable_of_pcolls(self):
# Inputs to flatten *must* be an iterable of PCollections.
with self.assertRaises(TypeError):
- {'l': 'test'} | beam.Flatten('flatten')
+ {'l': 'test'} | 'flatten' >> beam.Flatten()
with self.assertRaises(TypeError):
- set([1, 2, 3]) | beam.Flatten('flatten')
+ set([1, 2, 3]) | 'flatten' >> beam.Flatten()
def test_co_group_by_key_on_list(self):
pipeline = Pipeline('DirectPipelineRunner')
@@ -420,7 +420,7 @@ class PTransformTest(unittest.TestCase):
'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
pcoll_2 = pipeline | beam.Create(
'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
- result = (pcoll_1, pcoll_2) | beam.CoGroupByKey('cgbk')
+ result = (pcoll_1, pcoll_2) | 'cgbk' >> beam.CoGroupByKey()
assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
('b', ([3], [])),
('c', ([4], [7, 8]))]))
@@ -433,7 +433,7 @@ class PTransformTest(unittest.TestCase):
pcoll_2 = pipeline | beam.Create(
'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
result = ([pc for pc in (pcoll_1, pcoll_2)]
- | beam.CoGroupByKey('cgbk'))
+ | 'cgbk' >> beam.CoGroupByKey())
assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
('b', ([3], [])),
('c', ([4], [7, 8]))]))
@@ -445,7 +445,7 @@ class PTransformTest(unittest.TestCase):
'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
pcoll_2 = pipeline | beam.Create(
'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
- result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey('cgbk')
+ result = {'X': pcoll_1, 'Y': pcoll_2} | 'cgbk' >> beam.CoGroupByKey()
assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}),
('b', {'X': [3], 'Y': []}),
('c', {'X': [4], 'Y': [7, 8]})]))
@@ -453,10 +453,10 @@ class PTransformTest(unittest.TestCase):
def test_group_by_key_input_must_be_kv_pairs(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcolls = pipeline | beam.Create('A', [1, 2, 3, 4, 5])
+ pcolls = pipeline | 'A' >> beam.Create([1, 2, 3, 4, 5])
with self.assertRaises(typehints.TypeCheckError) as e:
- pcolls | beam.GroupByKey('D')
+ pcolls | 'D' >> beam.GroupByKey()
pipeline.run()
self.assertStartswith(
@@ -466,9 +466,9 @@ class PTransformTest(unittest.TestCase):
def test_group_by_key_only_input_must_be_kv_pairs(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcolls = pipeline | beam.Create('A', ['a', 'b', 'f'])
+ pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
with self.assertRaises(typehints.TypeCheckError) as cm:
- pcolls | beam.GroupByKeyOnly('D')
+ pcolls | 'D' >> beam.GroupByKeyOnly()
pipeline.run()
expected_error_prefix = ('Input type hint violation at D: expected '
@@ -506,20 +506,20 @@ class PTransformTest(unittest.TestCase):
t = (beam.Map(lambda x: (x, 1))
| beam.GroupByKey()
| beam.Map(lambda (x, ones): (x, sum(ones))))
- result = pipeline | beam.Create('start', ['a', 'a', 'b']) | t
+ result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t
assert_that(result, equal_to([('a', 2), ('b', 1)]))
pipeline.run()
def test_apply_to_list(self):
self.assertItemsEqual(
- [1, 2, 3], [0, 1, 2] | beam.Map('add_one', lambda x: x + 1))
- self.assertItemsEqual([1], [0, 1, 2] | beam.Filter('odd', lambda x: x % 2))
+ [1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1))
+ self.assertItemsEqual([1], [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2))
self.assertItemsEqual([1, 2, 100, 3],
- ([1, 2, 3], [100]) | beam.Flatten('flat'))
+ ([1, 2, 3], [100]) | 'flat' >> beam.Flatten())
join_input = ([('k', 'a')],
[('k', 'b'), ('k', 'c')])
self.assertItemsEqual([('k', (['a'], ['b', 'c']))],
- join_input | beam.CoGroupByKey('join'))
+ join_input | 'join' >> beam.CoGroupByKey())
def test_multi_input_ptransform(self):
class DisjointUnion(PTransform):
@@ -583,7 +583,7 @@ class PTransformLabelsTest(unittest.TestCase):
gbk = beam.GroupByKey('gbk')
map2 = beam.Map('map2', lambda (x, ones): (x, sum(ones)))
t = (map1 | gbk | map2)
- result = pipeline | beam.Create('start', ['a', 'a', 'b']) | t
+ result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t
self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels)
self.assertTrue('map1|gbk|map2/gbk' in pipeline.applied_labels)
self.assertTrue('map1|gbk|map2/map2' in pipeline.applied_labels)
@@ -592,7 +592,7 @@ class PTransformLabelsTest(unittest.TestCase):
def test_apply_custom_transform_without_label(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('pcoll', [1, 2, 3])
+ pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
custom = PTransformLabelsTest.CustomTransform()
result = pipeline.apply(custom, pcoll)
self.assertTrue('CustomTransform' in pipeline.applied_labels)
@@ -602,7 +602,7 @@ class PTransformLabelsTest(unittest.TestCase):
def test_apply_custom_transform_with_label(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('pcoll', [1, 2, 3])
+ pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
custom = PTransformLabelsTest.CustomTransform('*custom*')
result = pipeline.apply(custom, pcoll)
self.assertTrue('*custom*' in pipeline.applied_labels)
@@ -613,7 +613,7 @@ class PTransformLabelsTest(unittest.TestCase):
def test_combine_without_label(self):
vals = [1, 2, 3, 4, 5, 6, 7]
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', vals)
+ pcoll = pipeline | 'start' >> beam.Create(vals)
combine = beam.CombineGlobally(sum)
result = pcoll | combine
self.assertTrue('CombineGlobally(sum)' in pipeline.applied_labels)
@@ -622,7 +622,7 @@ class PTransformLabelsTest(unittest.TestCase):
def test_apply_ptransform_using_decorator(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('pcoll', [1, 2, 3])
+ pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
sample = SamplePTransform('*sample*')
_ = pcoll | sample
self.assertTrue('*sample*' in pipeline.applied_labels)
@@ -633,7 +633,7 @@ class PTransformLabelsTest(unittest.TestCase):
def test_combine_with_label(self):
vals = [1, 2, 3, 4, 5, 6, 7]
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Create('start', vals)
+ pcoll = pipeline | 'start' >> beam.Create(vals)
combine = beam.CombineGlobally('*sum*', sum)
result = pcoll | combine
self.assertTrue('*sum*' in pipeline.applied_labels)
@@ -642,7 +642,7 @@ class PTransformLabelsTest(unittest.TestCase):
def check_label(self, ptransform, expected_label):
pipeline = Pipeline('DirectPipelineRunner')
- pipeline | beam.Create('start', [('a', 1)]) | ptransform
+ pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform
actual_label = sorted(pipeline.applied_labels - {'start'})[0]
self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
@@ -679,8 +679,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return [context.element + five]
d = (self.p
- | beam.Create('t', [1, 2, 3]).with_output_types(int)
- | beam.ParDo('add', AddWithFive(), 5))
+ | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'add' >> beam.ParDo(AddWithFive(), 5))
assert_that(d, equal_to([6, 7, 8]))
self.p.run()
@@ -694,8 +694,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('t', [1, 2, 3]).with_output_types(int)
- | beam.ParDo('upper', ToUpperCaseWithPrefix(), 'hello'))
+ | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello'))
self.assertEqual("Type hint violation for 'upper': "
"requires <type 'str'> but got <type 'int'> for context",
@@ -711,8 +711,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return [context.element + num]
d = (self.p
- | beam.Create('t', [1, 2, 3]).with_output_types(int)
- | beam.ParDo('add', AddWithNum(), 5))
+ | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'add' >> beam.ParDo(AddWithNum(), 5))
assert_that(d, equal_to([6, 7, 8]))
self.p.run()
@@ -728,8 +728,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('t', ['1', '2', '3']).with_output_types(str)
- | beam.ParDo('add', AddWithNum(), 5))
+ | 't' >> beam.Create(['1', '2', '3']).with_output_types(str)
+ | 'add' >> beam.ParDo(AddWithNum(), 5))
self.p.run()
self.assertEqual("Type hint violation for 'add': "
@@ -746,8 +746,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# will receive a str instead, which should result in a raised exception.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('s', ['b', 'a', 'r']).with_output_types(str)
- | beam.FlatMap('to str', int_to_str))
+ | 's' >> beam.Create(['b', 'a', 'r']).with_output_types(str)
+ | 'to str' >> beam.FlatMap(int_to_str))
self.assertEqual("Type hint violation for 'to str': "
"requires <type 'int'> but got <type 'str'> for a",
@@ -761,8 +761,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# If this type-checks than no error should be raised.
d = (self.p
- | beam.Create('t', ['t', 'e', 's', 't']).with_output_types(str)
- | beam.FlatMap('case', to_all_upper_case))
+ | 't' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | 'case' >> beam.FlatMap(to_all_upper_case))
assert_that(d, equal_to(['T', 'E', 'S', 'T']))
self.p.run()
@@ -775,10 +775,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# expecting pcoll's of type str instead.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
- | (beam.FlatMap('score', lambda x: [1] if x == 't' else [2])
+ | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | 'score' >> (beam.FlatMap(lambda x: [1] if x == 't' else [2])
.with_input_types(str).with_output_types(int))
- | (beam.FlatMap('upper', lambda x: [x.upper()])
+ | 'upper' >> (beam.FlatMap(lambda x: [x.upper()])
.with_input_types(str).with_output_types(str)))
self.assertEqual("Type hint violation for 'upper': "
@@ -788,10 +788,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_pardo_properly_type_checks_using_type_hint_methods(self):
# Pipeline should be created successfully without an error
d = (self.p
- | beam.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
- | beam.FlatMap('dup', lambda x: [x + x])
+ | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | 'dup' >> beam.FlatMap(lambda x: [x + x])
.with_input_types(str).with_output_types(str)
- | beam.FlatMap('upper', lambda x: [x.upper()])
+ | 'upper' >> beam.FlatMap(lambda x: [x.upper()])
.with_input_types(str).with_output_types(str))
assert_that(d, equal_to(['TT', 'EE', 'SS', 'TT']))
@@ -802,8 +802,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# int's, while Map is expecting one of str.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('s', [1, 2, 3, 4]).with_output_types(int)
- | beam.Map('upper', lambda x: x.upper())
+ | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+ | 'upper' >> beam.Map(lambda x: x.upper())
.with_input_types(str).with_output_types(str))
self.assertEqual("Type hint violation for 'upper': "
@@ -813,8 +813,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_map_properly_type_checks_using_type_hints_methods(self):
# No error should be raised if this type-checks properly.
d = (self.p
- | beam.Create('s', [1, 2, 3, 4]).with_output_types(int)
- | beam.Map('to_str', lambda x: str(x))
+ | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+ | 'to_str' >> beam.Map(lambda x: str(x))
.with_input_types(int).with_output_types(str))
assert_that(d, equal_to(['1', '2', '3', '4']))
self.p.run()
@@ -829,8 +829,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# However, 'Map' should detect that Create has hinted an int instead.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('s', [1, 2, 3, 4]).with_output_types(int)
- | beam.Map('upper', upper))
+ | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+ | 'upper' >> beam.Map(upper))
self.assertEqual("Type hint violation for 'upper': "
"requires <type 'str'> but got <type 'int'> for s",
@@ -844,8 +844,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# If this type-checks than no error should be raised.
d = (self.p
- | beam.Create('bools', [True, False, True]).with_output_types(bool)
- | beam.Map('to_ints', bool_to_int))
+ | 'bools' >> beam.Create([True, False, True]).with_output_types(bool)
+ | 'to_ints' >> beam.Map(bool_to_int))
assert_that(d, equal_to([1, 0, 1]))
self.p.run()
@@ -854,10 +854,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# incoming.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str)
- | beam.Map('lower', lambda x: x.lower())
+ | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
+ | 'lower' >> beam.Map(lambda x: x.lower())
.with_input_types(str).with_output_types(str)
- | beam.Filter('below 3', lambda x: x < 3).with_input_types(int))
+ | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
self.assertEqual("Type hint violation for 'below 3': "
"requires <type 'int'> but got <type 'str'> for x",
@@ -866,10 +866,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_filter_type_checks_using_type_hints_method(self):
# No error should be raised if this type-checks properly.
d = (self.p
- | beam.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str)
- | beam.Map('to int', lambda x: int(x))
+ | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
+ | 'to int' >> beam.Map(lambda x: int(x))
.with_input_types(str).with_output_types(int)
- | beam.Filter('below 3', lambda x: x < 3).with_input_types(int))
+ | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
assert_that(d, equal_to([1, 2]))
self.p.run()
@@ -881,8 +881,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# Func above was hinted to only take a float, yet an int will be passed.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('ints', [1, 2, 3, 4]).with_output_types(int)
- | beam.Filter('half', more_than_half))
+ | 'ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+ | 'half' >> beam.Filter(more_than_half))
self.assertEqual("Type hint violation for 'half': "
"requires <type 'float'> but got <type 'int'> for a",
@@ -896,17 +896,17 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# Filter should deduce that it returns the same type that it takes.
(self.p
- | beam.Create('str', range(5)).with_output_types(int)
- | beam.Filter('half', half)
- | beam.Map('to bool', lambda x: bool(x))
+ | 'str' >> beam.Create(range(5)).with_output_types(int)
+ | 'half' >> beam.Filter(half)
+ | 'to bool' >> beam.Map(lambda x: bool(x))
.with_input_types(int).with_output_types(bool))
def test_group_by_key_only_output_type_deduction(self):
d = (self.p
- | beam.Create('str', ['t', 'e', 's', 't']).with_output_types(str)
- | (beam.Map('pair', lambda x: (x, ord(x)))
+ | 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | 'pair' >> (beam.Map(lambda x: (x, ord(x)))
.with_output_types(typehints.KV[str, str]))
- | beam.GroupByKeyOnly('O'))
+ | 'O' >> beam.GroupByKeyOnly())
# Output type should correctly be deduced.
# GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -915,10 +915,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_group_by_key_output_type_deduction(self):
d = (self.p
- | beam.Create('str', range(20)).with_output_types(int)
- | (beam.Map('pair negative', lambda x: (x % 5, -x))
+ | 'str' >> beam.Create(range(20)).with_output_types(int)
+ | 'pair negative' >> (beam.Map(lambda x: (x % 5, -x))
.with_output_types(typehints.KV[int, int]))
- | beam.GroupByKey('T'))
+ | 'T' >> beam.GroupByKey())
# Output type should correctly be deduced.
# GBK should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -929,8 +929,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# GBK will be passed raw int's here instead of some form of KV[A, B].
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('s', [1, 2, 3]).with_output_types(int)
- | beam.GroupByKeyOnly('F'))
+ | 's' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'F' >> beam.GroupByKeyOnly())
self.assertEqual("Input type hint violation at F: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -942,9 +942,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# aliased to Tuple[int, str].
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | (beam.Create('s', range(5))
+ | 's' >> (beam.Create(range(5))
.with_output_types(typehints.Iterable[int]))
- | beam.GroupByKey('T'))
+ | 'T' >> beam.GroupByKey())
self.assertEqual("Input type hint violation at T: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -958,8 +958,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# information to the ParDo.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('nums', range(5))
- | beam.FlatMap('mod dup', lambda x: (x % 2, x)))
+ | 'nums' >> beam.Create(range(5))
+ | 'mod dup' >> beam.FlatMap(lambda x: (x % 2, x)))
self.assertEqual('Pipeline type checking is enabled, however no output '
'type-hint was found for the PTransform Create(nums)',
@@ -971,9 +971,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# information to GBK-only.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('nums', range(5)).with_output_types(int)
- | beam.Map('mod dup', lambda x: (x % 2, x))
- | beam.GroupByKeyOnly('G'))
+ | 'nums' >> beam.Create(range(5)).with_output_types(int)
+ | 'mod dup' >> beam.Map(lambda x: (x % 2, x))
+ | 'G' >> beam.GroupByKeyOnly())
self.assertEqual('Pipeline type checking is enabled, however no output '
'type-hint was found for the PTransform '
@@ -986,8 +986,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# The pipeline below should raise a TypeError, however pipeline type
# checking was disabled above.
(self.p
- | beam.Create('t', [1, 2, 3]).with_output_types(int)
- | beam.Map('lower', lambda x: x.lower())
+ | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'lower' >> beam.Map(lambda x: x.lower())
.with_input_types(str).with_output_types(str))
def test_run_time_type_checking_enabled_type_violation(self):
@@ -1002,8 +1002,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# Function above has been type-hinted to only accept an int. But in the
# pipeline execution it'll be passed a string due to the output of Create.
(self.p
- | beam.Create('t', ['some_string'])
- | beam.Map('to str', int_to_string))
+ | 't' >> beam.Create(['some_string'])
+ | 'to str' >> beam.Map(int_to_string))
with self.assertRaises(typehints.TypeCheckError) as e:
self.p.run()
@@ -1026,10 +1026,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# Pipeline checking is off, but the above function should satisfy types at
# run-time.
result = (self.p
- | beam.Create('t', ['t', 'e', 's', 't', 'i', 'n', 'g'])
+ | 't' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g'])
.with_output_types(str)
- | beam.Map('gen keys', group_with_upper_ord)
- | beam.GroupByKey('O'))
+ | 'gen keys' >> beam.Map(group_with_upper_ord)
+ | 'O' >> beam.GroupByKey())
assert_that(result, equal_to([(1, ['g']),
(3, ['s', 'i', 'n']),
@@ -1048,9 +1048,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return (a % 2, a)
(self.p
- | beam.Create('nums', range(5)).with_output_types(int)
- | beam.Map('is even', is_even_as_key)
- | beam.GroupByKey('parity'))
+ | 'nums' >> beam.Create(range(5)).with_output_types(int)
+ | 'is even' >> beam.Map(is_even_as_key)
+ | 'parity' >> beam.GroupByKey())
# Although all the types appear to be correct when checked at pipeline
# construction. Runtime type-checking should detect the 'is_even_as_key' is
@@ -1077,9 +1077,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return (a % 2 == 0, a)
result = (self.p
- | beam.Create('nums', range(5)).with_output_types(int)
- | beam.Map('is even', is_even_as_key)
- | beam.GroupByKey('parity'))
+ | 'nums' >> beam.Create(range(5)).with_output_types(int)
+ | 'is even' >> beam.Map(is_even_as_key)
+ | 'parity' >> beam.GroupByKey())
assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])]))
self.p.run()
@@ -1092,8 +1092,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# ParDo should receive an instance of type 'str', however an 'int' will be
# passed instead.
with self.assertRaises(typehints.TypeCheckError) as e:
- (self.p | beam.Create('n', [1, 2, 3])
- | (beam.FlatMap('to int', lambda x: [int(x)])
+ (self.p | 'n' >> beam.Create([1, 2, 3])
+ | 'to int' >> (beam.FlatMap(lambda x: [int(x)])
.with_input_types(str).with_output_types(int))
)
self.p.run()
@@ -1111,8 +1111,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)])
- | (beam.FlatMap('add', lambda (x, y): [x + y])
+ | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+ | 'add' >> (beam.FlatMap(lambda (x, y): [x + y])
.with_input_types(typehints.Tuple[int, int]).with_output_types(int))
)
self.p.run()
@@ -1136,8 +1136,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
lambda x: [float(x)]).with_input_types(int).with_output_types(
int).get_type_hints()
with self.assertRaises(typehints.TypeCheckError) as e:
- (self.p | beam.Create('n', [1, 2, 3])
- | (beam.FlatMap('to int', lambda x: [float(x)])
+ (self.p | 'n' >> beam.Create([1, 2, 3])
+ | 'to int' >> (beam.FlatMap(lambda x: [float(x)])
.with_input_types(int).with_output_types(int))
)
self.p.run()
@@ -1159,8 +1159,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# of 'int' will be generated instead.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)])
- | (beam.FlatMap('swap', lambda (x, y): [x + y])
+ | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+ | 'swap' >> (beam.FlatMap(lambda (x, y): [x + y])
.with_input_types(typehints.Tuple[int, float])
.with_output_types(typehints.Tuple[float, int]))
)
@@ -1183,7 +1183,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return a + b
with self.assertRaises(typehints.TypeCheckError) as e:
- (self.p | beam.Create('t', [1, 2, 3, 4]) | beam.Map('add 1', add, 1.0))
+ (self.p | 't' >> beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0))
self.p.run()
self.assertStartswith(
@@ -1199,8 +1199,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('t', [1, 2, 3, 4])
- | (beam.Map('add 1', lambda x, one: x + one, 1.0)
+ | 't' >> beam.Create([1, 2, 3, 4])
+ | 'add 1' >> (beam.Map(lambda x, one: x + one, 1.0)
.with_input_types(int, int)
.with_output_types(float)))
self.p.run()
@@ -1219,8 +1219,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return sum(ints)
d = (self.p
- | beam.Create('t', [1, 2, 3]).with_output_types(int)
- | beam.CombineGlobally('sum', sum_ints))
+ | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'sum' >> beam.CombineGlobally(sum_ints))
self.assertEqual(int, d.element_type)
assert_that(d, equal_to([6]))
@@ -1234,8 +1234,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('m', [1, 2, 3]).with_output_types(int)
- | beam.CombineGlobally('add', bad_combine))
+ | 'm' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'add' >> beam.CombineGlobally(bad_combine))
self.assertEqual(
"All functions for a Combine PTransform must accept a "
@@ -1255,9 +1255,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return list(range(n+1))
d = (self.p
- | beam.Create('t', [1, 2, 3]).with_output_types(int)
- | beam.CombineGlobally('sum', sum_ints)
- | beam.ParDo('range', range_from_zero))
+ | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'sum' >> beam.CombineGlobally(sum_ints)
+ | 'range' >> beam.ParDo(range_from_zero))
self.assertEqual(int, d.element_type)
assert_that(d, equal_to([0, 1, 2, 3, 4, 5, 6]))
@@ -1272,8 +1272,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return reduce(operator.mul, ints, 1)
d = (self.p
- | beam.Create('k', [5, 5, 5, 5]).with_output_types(int)
- | beam.CombineGlobally('mul', iter_mul))
+ | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
+ | 'mul' >> beam.CombineGlobally(iter_mul))
assert_that(d, equal_to([625]))
self.p.run()
@@ -1290,8 +1290,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('k', [5, 5, 5, 5]).with_output_types(int)
- | beam.CombineGlobally('mul', iter_mul))
+ | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
+ | 'mul' >> beam.CombineGlobally(iter_mul))
self.p.run()
self.assertStartswith(
@@ -1305,8 +1305,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_combine_pipeline_type_check_using_methods(self):
d = (self.p
- | beam.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
- | (beam.CombineGlobally('concat', lambda s: ''.join(s))
+ | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | 'concat' >> (beam.CombineGlobally(lambda s: ''.join(s))
.with_input_types(str).with_output_types(str)))
def matcher(expected):
@@ -1321,8 +1321,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | beam.Create('s', range(5)).with_output_types(int)
- | (beam.CombineGlobally('sum', lambda s: sum(s))
+ | 's' >> beam.Create(range(5)).with_output_types(int)
+ | 'sum' >> (beam.CombineGlobally(lambda s: sum(s))
.with_input_types(int).with_output_types(int)))
assert_that(d, equal_to([10]))
@@ -1331,8 +1331,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_combine_pipeline_type_check_violation_using_methods(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('e', range(3)).with_output_types(int)
- | (beam.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
+ | 'e' >> beam.Create(range(3)).with_output_types(int)
+ | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
.with_input_types(str).with_output_types(str)))
self.assertEqual("Input type hint violation at sort join: "
@@ -1345,8 +1345,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('e', range(3)).with_output_types(int)
- | (beam.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
+ | 'e' >> beam.Create(range(3)).with_output_types(int)
+ | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
.with_input_types(str).with_output_types(str)))
self.p.run()
@@ -1363,9 +1363,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('e', range(3)).with_output_types(int)
- | beam.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
- | beam.Map('f', lambda x: x + 1))
+ | 'e' >> beam.Create(range(3)).with_output_types(int)
+ | 'sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+ | 'f' >> beam.Map(lambda x: x + 1))
self.assertEqual(
'Pipeline type checking is enabled, '
@@ -1375,8 +1375,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_mean_globally_pipeline_checking_satisfied(self):
d = (self.p
- | beam.Create('c', range(5)).with_output_types(int)
- | combine.Mean.Globally('mean'))
+ | 'c' >> beam.Create(range(5)).with_output_types(int)
+ | 'mean' >> combine.Mean.Globally())
self.assertTrue(d.element_type is float)
assert_that(d, equal_to([2.0]))
@@ -1385,8 +1385,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_mean_globally_pipeline_checking_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('c', ['test']).with_output_types(str)
- | combine.Mean.Globally('mean'))
+ | 'c' >> beam.Create(['test']).with_output_types(str)
+ | 'mean' >> combine.Mean.Globally())
self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
"requires Tuple[TypeVariable[K], "
@@ -1398,8 +1398,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | beam.Create('c', range(5)).with_output_types(int)
- | combine.Mean.Globally('mean'))
+ | 'c' >> beam.Create(range(5)).with_output_types(int)
+ | 'mean' >> combine.Mean.Globally())
self.assertTrue(d.element_type is float)
assert_that(d, equal_to([2.0]))
@@ -1411,8 +1411,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('c', ['t', 'e', 's', 't']).with_output_types(str)
- | combine.Mean.Globally('mean'))
+ | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | 'mean' >> combine.Mean.Globally())
self.p.run()
self.assertEqual("Runtime type violation detected for transform input "
"when executing ParDoFlatMap(Combine): Tuple[Any, "
@@ -1427,10 +1427,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_mean_per_key_pipeline_checking_satisfied(self):
d = (self.p
- | beam.Create('c', range(5)).with_output_types(int)
- | (beam.Map('even group', lambda x: (not x % 2, x))
+ | 'c' >> beam.Create(range(5)).with_output_types(int)
+ | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
.with_output_types(typehints.KV[bool, int]))
- | combine.Mean.PerKey('even mean'))
+ | 'even mean' >> combine.Mean.PerKey())
self.assertCompatible(typehints.KV[bool, float], d.element_type)
assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
@@ -1439,10 +1439,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_mean_per_key_pipeline_checking_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('e', map(str, range(5))).with_output_types(str)
- | (beam.Map('upper pair', lambda x: (x.upper(), x))
+ | 'e' >> beam.Create(map(str, range(5))).with_output_types(str)
+ | 'upper pair' >> (beam.Map(lambda x: (x.upper(), x))
.with_output_types(typehints.KV[str, str]))
- | combine.Mean.PerKey('even mean'))
+ | 'even mean' >> combine.Mean.PerKey())
self.p.run()
self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
@@ -1455,10 +1455,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | beam.Create('c', range(5)).with_output_types(int)
- | (beam.Map('odd group', lambda x: (bool(x % 2), x))
+ | 'c' >> beam.Create(range(5)).with_output_types(int)
+ | 'odd group' >> (beam.Map(lambda x: (bool(x % 2), x))
.with_output_types(typehints.KV[bool, int]))
- | combine.Mean.PerKey('odd mean'))
+ | 'odd mean' >> combine.Mean.PerKey())
self.assertCompatible(typehints.KV[bool, float], d.element_type)
assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
@@ -1470,10 +1470,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('c', range(5)).with_output_types(int)
- | (beam.Map('odd group', lambda x: (x, str(bool(x % 2))))
+ | 'c' >> beam.Create(range(5)).with_output_types(int)
+ | 'odd group' >> (beam.Map(lambda x: (x, str(bool(x % 2))))
.with_output_types(typehints.KV[int, str]))
- | combine.Mean.PerKey('odd mean'))
+ | 'odd mean' >> combine.Mean.PerKey())
self.p.run()
self.assertStartswith(
@@ -1494,8 +1494,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_globally_pipeline_type_checking_satisfied(self):
d = (self.p
- | beam.Create('p', range(5)).with_output_types(int)
- | combine.Count.Globally('count int'))
+ | 'p' >> beam.Create(range(5)).with_output_types(int)
+ | 'count int' >> combine.Count.Globally())
self.assertTrue(d.element_type is int)
assert_that(d, equal_to([5]))
@@ -1505,8 +1505,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | beam.Create('p', range(5)).with_output_types(int)
- | combine.Count.Globally('count int'))
+ | 'p' >> beam.Create(range(5)).with_output_types(int)
+ | 'count int' >> combine.Count.Globally())
self.assertTrue(d.element_type is int)
assert_that(d, equal_to([5]))
@@ -1514,10 +1514,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_perkey_pipeline_type_checking_satisfied(self):
d = (self.p
- | beam.Create('p', range(5)).with_output_types(int)
- | (beam.Map('even group', lambda x: (not x % 2, x))
+ | 'p' >> beam.Create(range(5)).with_output_types(int)
+ | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
.with_output_types(typehints.KV[bool, int]))
- | combine.Count.PerKey('count int'))
+ | 'count int' >> combine.Count.PerKey())
self.assertCompatible(typehints.KV[bool, int], d.element_type)
assert_that(d, equal_to([(False, 2), (True, 3)]))
@@ -1526,8 +1526,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_perkey_pipeline_type_checking_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('p', range(5)).with_output_types(int)
- | combine.Count.PerKey('count int'))
+ | 'p' >> beam.Create(range(5)).with_output_types(int)
+ | 'count int' >> combine.Count.PerKey())
self.assertEqual("Input type hint violation at GroupByKey: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1538,10 +1538,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | beam.Create('c', ['t', 'e', 's', 't']).with_output_types(str)
- | beam.Map('dup key', lambda x: (x, x))
+ | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | 'dup key' >> beam.Map(lambda x: (x, x))
.with_output_types(typehints.KV[str, str])
- | combine.Count.PerKey('count dups'))
+ | 'count dups' >> combine.Count.PerKey())
self.assertCompatible(typehints.KV[str, int], d.element_type)
assert_that(d, equal_to([('e', 1), ('s', 1), ('t', 2)]))
@@ -1549,8 +1549,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_perelement_pipeline_type_checking_satisfied(self):
d = (self.p
- | beam.Create('w', [1, 1, 2, 3]).with_output_types(int)
- | combine.Count.PerElement('count elems'))
+ | 'w' >> beam.Create([1, 1, 2, 3]).with_output_types(int)
+ | 'count elems' >> combine.Count.PerElement())
self.assertCompatible(typehints.KV[int, int], d.element_type)
assert_that(d, equal_to([(1, 2), (2, 1), (3, 1)]))
@@ -1561,8 +1561,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('f', [1, 1, 2, 3])
- | combine.Count.PerElement('count elems'))
+ | 'f' >> beam.Create([1, 1, 2, 3])
+ | 'count elems' >> combine.Count.PerElement())
self.assertEqual('Pipeline type checking is enabled, however no output '
'type-hint was found for the PTransform '
@@ -1573,9 +1573,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | beam.Create('w', [True, True, False, True, True])
+ | 'w' >> beam.Create([True, True, False, True, True])
.with_output_types(bool)
- | combine.Count.PerElement('count elems'))
+ | 'count elems' >> combine.Count.PerElement())
self.assertCompatible(typehints.KV[bool, int], d.element_type)
assert_that(d, equal_to([(False, 1), (True, 4)]))
@@ -1583,8 +1583,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_top_of_pipeline_checking_satisfied(self):
d = (self.p
- | beam.Create('n', range(5, 11)).with_output_types(int)
- | combine.Top.Of('top 3', 3, lambda x, y: x < y))
+ | 'n' >> beam.Create(range(5, 11)).with_output_types(int)
+ | 'top 3' >> combine.Top.Of(3, lambda x, y: x < y))
self.assertCompatible(typehints.Iterable[int],
d.element_type)
@@ -1595,8 +1595,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | beam.Create('n', list('testing')).with_output_types(str)
- | combine.Top.Of('acii top', 3, lambda x, y: x < y))
+ | 'n' >> beam.Create(list('testing')).with_output_types(str)
+ | 'acii top' >> combine.Top.Of(3, lambda x, y: x < y))
self.assertCompatible(typehints.Iterable[str], d.element_type)
assert_that(d, equal_to([['t', 't', 's']]))
@@ -1605,9 +1605,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_per_key_pipeline_checking_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('n', range(100)).with_output_types(int)
- | beam.Map('num + 1', lambda x: x + 1).with_output_types(int)
- | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+ | 'n' >> beam.Create(range(100)).with_output_types(int)
+ | 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
+ | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
self.assertEqual("Input type hint violation at GroupByKey: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1616,10 +1616,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_per_key_pipeline_checking_satisfied(self):
d = (self.p
- | beam.Create('n', range(100)).with_output_types(int)
- | (beam.Map('group mod 3', lambda x: (x % 3, x))
+ | 'n' >> beam.Create(range(100)).with_output_types(int)
+ | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
.with_output_types(typehints.KV[int, int]))
- | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+ | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
self.assertCompatible(typehints.Tuple[int, typehints.Iterable[int]],
d.element_type)
@@ -1630,10 +1630,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | beam.Create('n', range(21))
- | (beam.Map('group mod 3', lambda x: (x % 3, x))
+ | 'n' >> beam.Create(range(21))
+ | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
.with_output_types(typehints.KV[int, int]))
- | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+ | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
d.element_type)
@@ -1642,8 +1642,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_sample_globally_pipeline_satisfied(self):
d = (self.p
- | beam.Create('m', [2, 2, 3, 3]).with_output_types(int)
- | combine.Sample.FixedSizeGlobally('sample', 3))
+ | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+ | 'sample' >> combine.Sample.FixedSizeGlobally(3))
self.assertCompatible(typehints.Iterable[int], d.element_type)
@@ -1658,8 +1658,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | beam.Create('m', [2, 2, 3, 3]).with_output_types(int)
- | combine.Sample.FixedSizeGlobally('sample', 2))
+ | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+ | 'sample' >> combine.Sample.FixedSizeGlobally(2))
self.assertCompatible(typehints.Iterable[int], d.element_type)
@@ -1672,9 +1672,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_sample_per_key_pipeline_satisfied(self):
d = (self.p
- | (beam.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)])
+ | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
.with_output_types(typehints.KV[int, int]))
- | combine.Sample.FixedSizePerKey('sample', 2))
+ | 'sample' >> combine.Sample.FixedSizePerKey(2))
self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
d.element_type)
@@ -1691,9 +1691,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | (beam.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)])
+ | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
.with_output_types(typehints.KV[int, int]))
- | combine.Sample.FixedSizePerKey('sample', 1))
+ | 'sample' >> combine.Sample.FixedSizePerKey(1))
self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
d.element_type)
@@ -1708,8 +1708,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_to_list_pipeline_check_satisfied(self):
d = (self.p
- | beam.Create('c', (1, 2, 3, 4)).with_output_types(int)
- | combine.ToList('to list'))
+ | 'c' >> beam.Create((1, 2, 3, 4)).with_output_types(int)
+ | 'to list' >> combine.ToList())
self.assertCompatible(typehints.List[int], d.element_type)
@@ -1724,8 +1724,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | beam.Create('c', list('test')).with_output_types(str)
- | combine.ToList('to list'))
+ | 'c' >> beam.Create(list('test')).with_output_types(str)
+ | 'to list' >> combine.ToList())
self.assertCompatible(typehints.List[str], d.element_type)
@@ -1739,8 +1739,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_to_dict_pipeline_check_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create('d', [1, 2, 3, 4]).with_output_types(int)
- | combine.ToDict('to dict'))
+ | 'd' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+ | 'to dict' >> combine.ToDict())
self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
"requires Tuple[TypeVariable[K], "
@@ -1753,7 +1753,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| beam.Create(
'd',
[(1, 2), (3, 4)]).with_output_types(typehints.Tuple[int, int])
- | combine.ToDict('to dict'))
+ | 'to dict' >> combine.ToDict())
self.assertCompatible(typehints.Dict[int, int], d.element_type)
assert_that(d, equal_to([{1: 2, 3: 4}]))
@@ -1763,9 +1763,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | (beam.Create('d', [('1', 2), ('3', 4)])
+ | 'd' >> (beam.Create([('1', 2), ('3', 4)])
.with_output_types(typehints.Tuple[str, int]))
- | combine.ToDict('to dict'))
+ | 'to dict' >> combine.ToDict())
self.assertCompatible(typehints.Dict[str, int], d.element_type)
assert_that(d, equal_to([{'1': 2, '3': 4}]))
@@ -1776,8 +1776,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(TypeError) as e:
(self.p
- | beam.Create('t', [1, 2, 3]).with_output_types(int)
- | beam.Map('len', lambda x: len(x)).with_output_types(int))
+ | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'len' >> beam.Map(lambda x: len(x)).with_output_types(int))
self.p.run()
# Our special type-checking related TypeError shouldn't have been raised.
@@ -1799,8 +1799,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
beam.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
def test_pipeline_inference(self):
- created = self.p | beam.Create('c', ['a', 'b', 'c'])
- mapped = created | beam.Map('pair with 1', lambda x: (x, 1))
+ created = self.p | 'c' >> beam.Create(['a', 'b', 'c'])
+ mapped = created | 'pair with 1' >> beam.Map(lambda x: (x, 1))
grouped = mapped | beam.GroupByKey()
self.assertEqual(str, created.element_type)
self.assertEqual(typehints.KV[str, int], mapped.element_type)
@@ -1810,8 +1810,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_inferred_bad_kv_type(self):
with self.assertRaises(typehints.TypeCheckError) as e:
_ = (self.p
- | beam.Create('t', ['a', 'b', 'c'])
- | beam.Map('ungroupable', lambda x: (x, 0, 1.0))
+ | 't' >> beam.Create(['a', 'b', 'c'])
+ | 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0))
| beam.GroupByKey())
self.assertEqual('Input type hint violation at GroupByKey: '
@@ -1821,11 +1821,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_type_inference_command_line_flag_toggle(self):
self.p.options.view_as(TypeOptions).pipeline_type_check = False
- x = self.p | beam.Create('t', [1, 2, 3, 4])
+ x = self.p | 't' >> beam.Create([1, 2, 3, 4])
self.assertIsNone(x.element_type)
self.p.options.view_as(TypeOptions).pipeline_type_check = True
- x = self.p | beam.Create('m', [1, 2, 3, 4])
+ x = self.p | 'm' >> beam.Create([1, 2, 3, 4])
self.assertEqual(int, x.element_type)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index b7a121d..bbb7787 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -160,9 +160,9 @@ def KvSwap(label='KvSwap'): # pylint: disable=invalid-name
def RemoveDuplicates(pcoll): # pylint: disable=invalid-name
"""Produces a PCollection containing the unique elements of a PCollection."""
return (pcoll
- | Map('ToPairs', lambda v: (v, None))
- | CombinePerKey('Group', lambda vs: None)
- | Keys('RemoveDuplicates'))
+ | 'ToPairs' >> Map(lambda v: (v, None))
+ | 'Group' >> CombinePerKey(lambda vs: None)
+ | 'RemoveDuplicates' >> Keys())
class DataflowAssertException(Exception):
@@ -220,7 +220,7 @@ def assert_that(actual, matcher, label='assert_that'):
class AssertThat(PTransform):
def apply(self, pipeline):
- return pipeline | Create('singleton', [None]) | Map(match, AllOf(actual))
+ return pipeline | 'singleton' >> Create([None]) | Map(match, AllOf(actual))
def default_label(self):
return label
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 186fcd4..012dde4 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -127,14 +127,14 @@ class WindowTest(unittest.TestCase):
self.assertEqual([IntervalWindow(2, 25)], merge(2, 15, 10))
def timestamped_key_values(self, pipeline, key, *timestamps):
- return (pipeline | Create('start', timestamps)
+ return (pipeline | 'start' >> Create(timestamps)
| Map(lambda x: WindowedValue((key, x), x, [])))
def test_sliding_windows(self):
p = Pipeline('DirectPipelineRunner')
pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
result = (pcoll
- | WindowInto('w', SlidingWindows(period=2, size=4))
+ | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
| GroupByKey()
| reify_windows)
expected = [('key @ [-2.0, 2.0)', [1]),
@@ -147,7 +147,7 @@ class WindowTest(unittest.TestCase):
p = Pipeline('DirectPipelineRunner')
pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
result = (pcoll
- | WindowInto('w', Sessions(10))
+ | 'w' >> WindowInto(Sessions(10))
| GroupByKey()
| sort_values
| reify_windows)
@@ -159,9 +159,9 @@ class WindowTest(unittest.TestCase):
def test_timestamped_value(self):
p = Pipeline('DirectPipelineRunner')
result = (p
- | Create('start', [(k, k) for k in range(10)])
+ | 'start' >> Create([(k, k) for k in range(10)])
| Map(lambda (x, t): TimestampedValue(x, t))
- | WindowInto('w', FixedWindows(5))
+ | 'w' >> WindowInto(FixedWindows(5))
| Map(lambda v: ('key', v))
| GroupByKey())
assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
@@ -172,13 +172,13 @@ class WindowTest(unittest.TestCase):
p = Pipeline('DirectPipelineRunner')
result = (p
# Create some initial test values.
- | Create('start', [(k, k) for k in range(10)])
+ | 'start' >> Create([(k, k) for k in range(10)])
# The purpose of the WindowInto transform is to establish a
# FixedWindows windowing function for the PCollection.
# It does not bucket elements into windows since the timestamps
# from Create are not spaced 5 ms apart and very likely they all
# fall into the same window.
- | WindowInto('w', FixedWindows(5))
+ | 'w' >> WindowInto(FixedWindows(5))
# Generate timestamped values using the values as timestamps.
# Now there are values 5 ms apart and since Map propagates the
# windowing function from input to output the output PCollection
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index 51163bc..af3668c 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -100,7 +100,7 @@ class WriteTest(unittest.TestCase):
write_to_test_sink = WriteToTestSink(return_init_result,
return_write_results)
p = Pipeline(options=PipelineOptions([]))
- result = p | beam.Create('start', data) | write_to_test_sink
+ result = p | 'start' >> beam.Create(data) | write_to_test_sink
assert_that(result, is_empty())
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index b25f231..4e1ab68 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -80,7 +80,7 @@ class MainInputTest(unittest.TestCase):
['a', 'b', 'c'] | beam.ParDo(MyDoFn())
with self.assertRaises(typehints.TypeCheckError):
- [1, 2, 3] | (beam.ParDo(MyDoFn()) | beam.ParDo('again', MyDoFn()))
+ [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
def test_typed_dofn_instance(self):
class MyDoFn(beam.DoFn):
@@ -95,7 +95,7 @@ class MainInputTest(unittest.TestCase):
['a', 'b', 'c'] | beam.ParDo(my_do_fn)
with self.assertRaises(typehints.TypeCheckError):
- [1, 2, 3] | (beam.ParDo(my_do_fn) | beam.ParDo('again', my_do_fn))
+ [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))
class SideInputTest(unittest.TestCase):
@@ -170,14 +170,14 @@ class SideInputTest(unittest.TestCase):
return s * times
p = beam.Pipeline(options=PipelineOptions([]))
main_input = p | beam.Create(['a', 'bb', 'c'])
- side_input = p | beam.Create('side', [3])
+ side_input = p | 'side' >> beam.Create([3])
result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input))
assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc']))
p.run()
- bad_side_input = p | beam.Create('bad_side', ['z'])
+ bad_side_input = p | 'bad_side' >> beam.Create(['z'])
with self.assertRaises(typehints.TypeCheckError):
- main_input | beam.Map('again', repeat, pvalue.AsSingleton(bad_side_input))
+ main_input | 'again' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input))
def test_deferred_side_input_iterable(self):
@typehints.with_input_types(str, typehints.Iterable[str])
@@ -185,14 +185,14 @@ class SideInputTest(unittest.TestCase):
return glue.join(sorted(items))
p = beam.Pipeline(options=PipelineOptions([]))
main_input = p | beam.Create(['a', 'bb', 'c'])
- side_input = p | beam.Create('side', ['x', 'y', 'z'])
+ side_input = p | 'side' >> beam.Create(['x', 'y', 'z'])
result = main_input | beam.Map(concat, pvalue.AsIter(side_input))
assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz']))
p.run()
- bad_side_input = p | beam.Create('bad_side', [1, 2, 3])
+ bad_side_input = p | 'bad_side' >> beam.Create([1, 2, 3])
with self.assertRaises(typehints.TypeCheckError):
- main_input | beam.Map('fail', concat, pvalue.AsIter(bad_side_input))
+ main_input | 'fail' >> beam.Map(concat, pvalue.AsIter(bad_side_input))
class CustomTransformTest(unittest.TestCase):
[11/12] incubator-beam git commit: Final cleanup pass.
Posted by ro...@apache.org.
Final cleanup pass.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3c078fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3c078fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3c078fe
Branch: refs/heads/python-sdk
Commit: e3c078fe28553b7e7317316b6df51b4c570573ba
Parents: c5b5b14
Author: Robert Bradshaw <ro...@google.com>
Authored: Sat Jul 23 01:32:23 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700
----------------------------------------------------------------------
.../examples/complete/autocomplete.py | 4 ++--
.../examples/complete/autocomplete_test.py | 4 ++--
.../apache_beam/examples/complete/estimate_pi.py | 5 ++---
.../examples/complete/top_wikipedia_sessions.py | 4 ++--
.../complete/top_wikipedia_sessions_test.py | 2 +-
.../apache_beam/examples/cookbook/bigshuffle.py | 12 +++++-------
.../examples/cookbook/custom_ptransform.py | 9 +++++----
.../examples/cookbook/custom_ptransform_test.py | 2 +-
.../apache_beam/examples/cookbook/filters.py | 4 ++--
.../examples/cookbook/group_with_coder.py | 9 +++++----
.../examples/cookbook/multiple_output_pardo.py | 14 +++++++-------
.../apache_beam/examples/snippets/snippets.py | 19 +++++++++----------
.../apache_beam/examples/streaming_wordcap.py | 4 ++--
13 files changed, 45 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 10d9009..c3cd88f 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -48,8 +48,8 @@ def run(argv=None):
| 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'TopPerPrefix' >> TopPerPrefix(5)
- | beam.Map('format',
- lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
+ | 'format' >> beam.Map(
+ lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
| 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 18d0511..0d20482 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -31,8 +31,8 @@ class AutocompleteTest(unittest.TestCase):
def test_top_prefixes(self):
p = beam.Pipeline('DirectPipelineRunner')
- words = p | 'create' >> beam.Create(self.WORDS)
- result = words | 'test' >> autocomplete.TopPerPrefix(5)
+ words = p | beam.Create(self.WORDS)
+ result = words | autocomplete.TopPerPrefix(5)
# values must be hashable for now
result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
assert_that(result, equal_to(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index c33db1d..37c1aad 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -112,9 +112,8 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
- | 'Estimate' >> EstimatePiTransform()
- | beam.io.Write('Write',
- beam.io.TextFileSink(known_args.output,
+ | EstimatePiTransform()
+ | beam.io.Write(beam.io.TextFileSink(known_args.output,
coder=JsonCoder())))
# Actually run the pipeline (all operations above are deferred).
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 7468484..a48a383 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -168,9 +168,9 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
- | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
+ | beam.Read(beam.io.TextFileSource(known_args.input))
| ComputeTopSessions(known_args.sampling_threshold)
- | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
+ | beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 207d6c4..a84cc78 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -50,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
def test_compute_top_sessions(self):
p = beam.Pipeline('DirectPipelineRunner')
- edits = p | 'create' >> beam.Create(self.EDITS)
+ edits = p | beam.Create(self.EDITS)
result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
beam.assert_that(result, beam.equal_to(self.EXPECTED))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index f7070dc..a076a0c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -54,8 +54,8 @@ def run(argv=None):
# Read the text file[pattern] into a PCollection.
lines = p | beam.io.Read(
- 'read', beam.io.TextFileSource(known_args.input,
- coder=beam.coders.BytesCoder()))
+ beam.io.TextFileSource(known_args.input,
+ coder=beam.coders.BytesCoder()))
# Count the occurrences of each word.
output = (lines
@@ -68,7 +68,7 @@ def run(argv=None):
lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
# Write the output using a "Write" transform that has side effects.
- output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+ output | beam.io.Write(beam.io.TextFileSink(known_args.output))
# Optionally write the input and output checksums.
if known_args.checksum_output:
@@ -76,16 +76,14 @@ def run(argv=None):
| 'input-csum' >> beam.Map(crc32line)
| 'combine-input-csum' >> beam.CombineGlobally(sum)
| 'hex-format' >> beam.Map(lambda x: '%x' % x))
- input_csum | beam.io.Write(
- 'write-input-csum',
+ input_csum | 'write-input-csum' >> beam.io.Write(
beam.io.TextFileSink(known_args.checksum_output + '-input'))
output_csum = (output
| 'output-csum' >> beam.Map(crc32line)
| 'combine-output-csum' >> beam.CombineGlobally(sum)
| 'hex-format-output' >> beam.Map(lambda x: '%x' % x))
- output_csum | beam.io.Write(
- 'write-output-csum',
+ output_csum | 'write-output-csum' >> beam.io.Write(
beam.io.TextFileSink(known_args.checksum_output + '-output'))
# Actually run the pipeline (all operations above are deferred).
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index 021eff6..ca13bbf 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -39,7 +39,7 @@ class Count1(beam.PTransform):
def apply(self, pcoll):
return (
pcoll
- | 'Init' >> beam.Map(lambda v: (v, 1))
+ | 'ParWithOne' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
@@ -47,7 +47,8 @@ def run_count1(known_args, options):
"""Runs the first example pipeline."""
logging.info('Running first pipeline')
p = beam.Pipeline(options=options)
- (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) | Count1()
+ (p | beam.io.Read(beam.io.TextFileSource(known_args.input))
+ | Count1()
| beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
@@ -57,7 +58,7 @@ def Count2(pcoll): # pylint: disable=invalid-name
"""Count as a decorated function."""
return (
pcoll
- | 'Init' >> beam.Map(lambda v: (v, 1))
+ | 'PairWithOne' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
@@ -84,7 +85,7 @@ def Count3(pcoll, factor=1): # pylint: disable=invalid-name
"""
return (
pcoll
- | 'Init' >> beam.Map(lambda v: (v, factor))
+ | 'PairWithOne' >> beam.Map(lambda v: (v, factor))
| beam.CombinePerKey(sum))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 603742f..806b031 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -40,7 +40,7 @@ class CustomCountTest(unittest.TestCase):
def run_pipeline(self, count_implementation, factor=1):
p = beam.Pipeline('DirectPipelineRunner')
- words = p | 'create' >> beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
+ words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
result = words | count_implementation
assert_that(
result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index efd0ba7..b3a969a 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -67,8 +67,8 @@ def filter_cold_days(input_data, month_filter):
return (
fields_of_interest
| 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter)
- | beam.Filter('below mean',
- lambda row, mean: row['mean_temp'] < mean, global_mean))
+ | 'below mean' >> beam.Filter(
+ lambda row, mean: row['mean_temp'] < mean, global_mean))
def run(argv=None):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 6c86f61..651a4f3 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -98,19 +98,20 @@ def run(argv=sys.argv[1:]):
coders.registry.register_coder(Player, PlayerCoder)
(p # pylint: disable=expression-not-assigned
- | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+ | beam.io.Read(beam.io.TextFileSource(known_args.input))
# The get_players function is annotated with a type hint above, so the type
# system knows the output type of the following operation is a key-value pair
# of a Player and an int. Please see the documentation for details on
# types that are inferred automatically as well as other ways to specify
# type hints.
- | 'get players' >> beam.Map(get_players)
+ | beam.Map(get_players)
# The output type hint of the previous step is used to infer that the key
# type of the following operation is the Player type. Since a custom coder
# is registered for the Player class above, a PlayerCoder will be used to
# encode Player objects as keys for this combine operation.
- | beam.CombinePerKey(sum) | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
- | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
+ | beam.CombinePerKey(sum)
+ | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
+ | beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 187d20b..d24170e 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -137,7 +137,7 @@ def run(argv=None):
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
- lines = p | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
+ lines = p | beam.Read(beam.io.TextFileSource(known_args.input))
# with_outputs allows accessing the side outputs of a DoFn.
split_lines_result = (lines
@@ -158,20 +158,20 @@ def run(argv=None):
| 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
| beam.GroupByKey()
| 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
- | beam.Write('write chars',
- beam.io.TextFileSink(known_args.output + '-chars')))
+ | 'write chars' >> beam.Write(
+ beam.io.TextFileSink(known_args.output + '-chars')))
# pylint: disable=expression-not-assigned
(short_words
| 'count short words' >> CountWords()
- | beam.Write('write short words',
- beam.io.TextFileSink(known_args.output + '-short-words')))
+ | 'write short words' >> beam.Write(
+ beam.io.TextFileSink(known_args.output + '-short-words')))
# pylint: disable=expression-not-assigned
(words
| 'count words' >> CountWords()
- | beam.Write('write words',
- beam.io.TextFileSink(known_args.output + '-words')))
+ | 'write words' >> beam.Write(
+ beam.io.TextFileSink(known_args.output + '-words')))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 9f3d6e1..891f464 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -105,9 +105,8 @@ def construct_pipeline(renames):
# [START pipelines_constructing_writing]
filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words)
- filtered_words | beam.io.Write('WriteMyFile',
- beam.io.TextFileSink(
- 'gs://some/outputData.txt'))
+ filtered_words | 'WriteMyFile' >> beam.io.Write(
+ beam.io.TextFileSink('gs://some/outputData.txt'))
# [END pipelines_constructing_writing]
p.visit(SnippetUtils.RenameFiles(renames))
@@ -242,8 +241,8 @@ def pipeline_options_remote(argv):
options.view_as(StandardOptions).runner = 'DirectPipelineRunner'
p = Pipeline(options=options)
- lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
- lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
+ lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
+ lines | beam.io.Write(beam.io.TextFileSink(my_output))
p.run()
@@ -283,8 +282,8 @@ def pipeline_options_local(argv):
p = Pipeline(options=options)
# [END pipeline_options_local]
- lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
- lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
+ lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
+ lines | beam.io.Write(beam.io.TextFileSink(my_output))
p.run()
@@ -344,8 +343,8 @@ def pipeline_logging(lines, output):
p = beam.Pipeline(options=PipelineOptions())
(p
| beam.Create(lines)
- | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
- | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(output)))
+ | beam.ParDo(ExtractWordsFn())
+ | beam.io.Write(beam.io.TextFileSink(output)))
p.run()
@@ -1160,6 +1159,6 @@ class Count(beam.PTransform):
def apply(self, pcoll):
return (
pcoll
- | 'Init' >> beam.Map(lambda v: (v, 1))
+ | 'PairWithOne' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
# [END model_library_transforms_count]
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index ef95a5f..d25ec3e 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -45,7 +45,7 @@ def run(argv=None):
# Read the text file[pattern] into a PCollection.
lines = p | beam.io.Read(
- 'read', beam.io.PubSubSource(known_args.input_topic))
+ beam.io.PubSubSource(known_args.input_topic))
# Capitalize the characters in each line.
transformed = (lines
@@ -54,7 +54,7 @@ def run(argv=None):
# Write to PubSub.
# pylint: disable=expression-not-assigned
transformed | beam.io.Write(
- 'pubsub_write', beam.io.PubSubSink(known_args.output_topic))
+ beam.io.PubSubSink(known_args.output_topic))
p.run()
[12/12] incubator-beam git commit: Closes #718
Posted by ro...@apache.org.
Closes #718
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/38d9dea2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/38d9dea2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/38d9dea2
Branch: refs/heads/python-sdk
Commit: 38d9dea2e62af280e4b9c258cedee70d6bcaa8ca
Parents: 9fe102a e3c078f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Sat Jul 23 16:43:47 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:47 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/dataflow_test.py | 99 ++--
.../examples/complete/autocomplete.py | 16 +-
.../examples/complete/autocomplete_test.py | 4 +-
.../examples/complete/estimate_pi.py | 14 +-
.../examples/complete/estimate_pi_test.py | 2 +-
.../complete/juliaset/juliaset/juliaset.py | 11 +-
.../apache_beam/examples/complete/tfidf.py | 32 +-
.../apache_beam/examples/complete/tfidf_test.py | 2 +-
.../examples/complete/top_wikipedia_sessions.py | 8 +-
.../complete/top_wikipedia_sessions_test.py | 2 +-
.../examples/cookbook/bigquery_schema.py | 4 +-
.../examples/cookbook/bigquery_side_input.py | 18 +-
.../cookbook/bigquery_side_input_test.py | 12 +-
.../examples/cookbook/bigquery_tornadoes.py | 10 +-
.../cookbook/bigquery_tornadoes_test.py | 2 +-
.../apache_beam/examples/cookbook/bigshuffle.py | 31 +-
.../apache_beam/examples/cookbook/coders.py | 2 +-
.../examples/cookbook/coders_test.py | 4 +-
.../examples/cookbook/custom_ptransform.py | 9 +-
.../examples/cookbook/custom_ptransform_test.py | 2 +-
.../apache_beam/examples/cookbook/filters.py | 14 +-
.../examples/cookbook/filters_test.py | 2 +-
.../examples/cookbook/group_with_coder.py | 9 +-
.../examples/cookbook/mergecontacts.py | 8 +-
.../examples/cookbook/multiple_output_pardo.py | 30 +-
.../apache_beam/examples/snippets/snippets.py | 69 ++-
.../examples/snippets/snippets_test.py | 16 +-
.../apache_beam/examples/streaming_wordcap.py | 6 +-
.../apache_beam/examples/streaming_wordcount.py | 8 +-
sdks/python/apache_beam/examples/wordcount.py | 16 +-
.../apache_beam/examples/wordcount_debugging.py | 18 +-
.../apache_beam/examples/wordcount_minimal.py | 16 +-
sdks/python/apache_beam/io/avroio.py | 2 +-
sdks/python/apache_beam/io/bigquery.py | 4 +-
.../apache_beam/io/filebasedsource_test.py | 4 +-
sdks/python/apache_beam/io/iobase.py | 4 +-
sdks/python/apache_beam/pipeline.py | 13 +
sdks/python/apache_beam/pipeline_test.py | 48 +-
sdks/python/apache_beam/pvalue_test.py | 6 +-
.../consumer_tracking_pipeline_visitor_test.py | 4 +-
sdks/python/apache_beam/runners/runner_test.py | 6 +-
.../apache_beam/transforms/combiners_test.py | 64 ++-
sdks/python/apache_beam/transforms/core.py | 21 +-
.../python/apache_beam/transforms/ptransform.py | 9 +-
.../apache_beam/transforms/ptransform_test.py | 521 ++++++++++---------
sdks/python/apache_beam/transforms/util.py | 9 +-
.../apache_beam/transforms/window_test.py | 14 +-
.../transforms/write_ptransform_test.py | 2 +-
.../typehints/typed_pipeline_test.py | 16 +-
49 files changed, 628 insertions(+), 615 deletions(-)
----------------------------------------------------------------------
[07/12] incubator-beam git commit: Cleanup and fix combiners_test.
Posted by ro...@apache.org.
Cleanup and fix combiners_test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/01830510
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/01830510
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/01830510
Branch: refs/heads/python-sdk
Commit: 01830510bded3c22ddd96937f5d83547702a4385
Parents: 7c186ce
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 16:05:23 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700
----------------------------------------------------------------------
.../apache_beam/transforms/combiners_test.py | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01830510/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 0439fe1..c970382 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -79,12 +79,11 @@ class CombineTest(unittest.TestCase):
assert_that(result_cmp, equal_to([[9, 6, 6, 5, 3, 2]]), label='assert:cmp')
# Again for per-key combines.
- pcoll = pipeline | Create(
- 'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
+ pcoll = pipeline | 'start-perkye' >> Create(
+ [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5)
result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4)
- result_key_cmp = pcoll | combine.Top.PerKey(
- 'cmp-perkey',
+ result_key_cmp = pcoll | 'cmp-perkey' >> combine.Top.PerKey(
6,
lambda a, b, names: len(names[a]) < len(names[b]),
names) # Note parameter passed to comparator.
@@ -105,11 +104,11 @@ class CombineTest(unittest.TestCase):
assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')
- pcoll = pipeline | Create(
- 'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
+ pcoll = pipeline | 'start-perkey' >> Create(
+ [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
result_ktop = pcoll | 'top-perkey' >> beam.CombinePerKey(combine.Largest(5))
- result_kbot = pcoll | beam.CombinePerKey(
- 'bot-perkey', combine.Smallest(4))
+ result_kbot = pcoll | 'bot-perkey' >> beam.CombinePerKey(
+ combine.Smallest(4))
assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top')
assert_that(result_kbot, equal_to([('a', [0, 1, 1, 1])]), label='k:bot')
pipeline.run()
@@ -138,8 +137,7 @@ class CombineTest(unittest.TestCase):
# Now test per-key samples.
pipeline = Pipeline('DirectPipelineRunner')
- pcoll = pipeline | Create(
- 'start-perkey',
+ pcoll = pipeline | 'start-perkey' >> Create(
sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)
@@ -158,7 +156,7 @@ class CombineTest(unittest.TestCase):
p = Pipeline('DirectPipelineRunner')
result = (
p
- | 'a' >> Create([(100, 0.0), ('b', 10, -1), ('c', 1, 100)])
+ | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
| beam.CombineGlobally(combine.TupleCombineFn(max,
combine.MeanCombineFn(),
sum)).without_defaults())
[05/12] incubator-beam git commit: fixup: failing tests expecting name
Posted by ro...@apache.org.
fixup: failing tests expecting name
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c5b5b14d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c5b5b14d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c5b5b14d
Branch: refs/heads/python-sdk
Commit: c5b5b14d35fc7f6b0a576f0fca19b730015e1282
Parents: b15d35c
Author: Robert Bradshaw <ro...@google.com>
Authored: Sat Jul 23 01:07:44 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/ptransform_test.py | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c5b5b14d/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 992f944..b99cd26 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -931,7 +931,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create([1, 2, 3]).with_output_types(int)
- | beam.GroupByKeyOnly())
+ | 'F' >> beam.GroupByKeyOnly())
self.assertEqual("Input type hint violation at F: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -945,7 +945,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
(self.p
| (beam.Create(range(5))
.with_output_types(typehints.Iterable[int]))
- | beam.GroupByKey())
+ | 'T' >> beam.GroupByKey())
self.assertEqual("Input type hint violation at T: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1563,7 +1563,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create([1, 1, 2, 3])
+ | 'f' >> beam.Create([1, 1, 2, 3])
| 'count elems' >> combine.Count.PerElement())
self.assertEqual('Pipeline type checking is enabled, however no output '
[03/12] incubator-beam git commit: Move names out of transform
constructors.
Posted by ro...@apache.org.
Move names out of transform constructors.
sed -i -r 's/[|] (\S+)[(](["'"'"'][^"'"'"']+.)(, +|([)]))/| \2 >> \1(\4/g'
Small number of tests will need to be fixed by hand.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/031c4cce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/031c4cce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/031c4cce
Branch: refs/heads/python-sdk
Commit: 031c4cce0b9eae0d50a49f43ffeced1edbfd2f8f
Parents: 937cf69
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 14:34:58 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:45 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/dataflow_test.py | 96 ++--
.../examples/complete/autocomplete.py | 8 +-
.../examples/complete/autocomplete_test.py | 4 +-
.../examples/complete/estimate_pi.py | 8 +-
.../examples/complete/estimate_pi_test.py | 2 +-
.../complete/juliaset/juliaset/juliaset.py | 8 +-
.../apache_beam/examples/complete/tfidf.py | 32 +-
.../apache_beam/examples/complete/tfidf_test.py | 2 +-
.../examples/complete/top_wikipedia_sessions.py | 8 +-
.../complete/top_wikipedia_sessions_test.py | 2 +-
.../examples/cookbook/bigquery_schema.py | 4 +-
.../examples/cookbook/bigquery_side_input.py | 6 +-
.../cookbook/bigquery_side_input_test.py | 8 +-
.../examples/cookbook/bigquery_tornadoes.py | 6 +-
.../cookbook/bigquery_tornadoes_test.py | 2 +-
.../apache_beam/examples/cookbook/bigshuffle.py | 18 +-
.../apache_beam/examples/cookbook/coders.py | 2 +-
.../examples/cookbook/coders_test.py | 4 +-
.../examples/cookbook/custom_ptransform.py | 6 +-
.../examples/cookbook/custom_ptransform_test.py | 2 +-
.../apache_beam/examples/cookbook/filters.py | 10 +-
.../examples/cookbook/filters_test.py | 2 +-
.../examples/cookbook/group_with_coder.py | 6 +-
.../examples/cookbook/mergecontacts.py | 8 +-
.../examples/cookbook/multiple_output_pardo.py | 18 +-
.../apache_beam/examples/snippets/snippets.py | 62 +--
.../examples/snippets/snippets_test.py | 10 +-
.../apache_beam/examples/streaming_wordcap.py | 2 +-
.../apache_beam/examples/streaming_wordcount.py | 8 +-
sdks/python/apache_beam/examples/wordcount.py | 14 +-
.../apache_beam/examples/wordcount_debugging.py | 16 +-
.../apache_beam/examples/wordcount_minimal.py | 14 +-
sdks/python/apache_beam/io/avroio.py | 2 +-
sdks/python/apache_beam/io/bigquery.py | 4 +-
.../apache_beam/io/filebasedsource_test.py | 4 +-
sdks/python/apache_beam/io/iobase.py | 4 +-
sdks/python/apache_beam/pipeline_test.py | 42 +-
sdks/python/apache_beam/pvalue_test.py | 6 +-
.../consumer_tracking_pipeline_visitor_test.py | 4 +-
sdks/python/apache_beam/runners/runner_test.py | 6 +-
.../apache_beam/transforms/combiners_test.py | 48 +-
sdks/python/apache_beam/transforms/core.py | 16 +-
.../python/apache_beam/transforms/ptransform.py | 2 +-
.../apache_beam/transforms/ptransform_test.py | 498 +++++++++----------
sdks/python/apache_beam/transforms/util.py | 8 +-
.../apache_beam/transforms/window_test.py | 14 +-
.../transforms/write_ptransform_test.py | 2 +-
.../typehints/typed_pipeline_test.py | 16 +-
48 files changed, 537 insertions(+), 537 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
index 476f8b2..bf66851 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -54,33 +54,33 @@ class DataflowTest(unittest.TestCase):
def Count(pcoll): # pylint: disable=invalid-name, no-self-argument
"""A Count transform: v, ... => (v, n), ..."""
return (pcoll
- | Map('AddCount', lambda x: (x, 1))
- | GroupByKey('GroupCounts')
- | Map('AddCounts', lambda (x, ones): (x, sum(ones))))
+ | 'AddCount' >> Map(lambda x: (x, 1))
+ | 'GroupCounts' >> GroupByKey()
+ | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones))))
def test_word_count(self):
pipeline = Pipeline('DirectPipelineRunner')
- lines = pipeline | Create('SomeWords', DataflowTest.SAMPLE_DATA)
+ lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
result = (
- (lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
+ (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
.apply('CountWords', DataflowTest.Count))
assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
pipeline.run()
def test_map(self):
pipeline = Pipeline('DirectPipelineRunner')
- lines = pipeline | Create('input', ['a', 'b', 'c'])
+ lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
result = (lines
- | Map('upper', str.upper)
- | Map('prefix', lambda x, prefix: prefix + x, 'foo-'))
+ | 'upper' >> Map(str.upper)
+ | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-'))
assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
pipeline.run()
def test_par_do_with_side_input_as_arg(self):
pipeline = Pipeline('DirectPipelineRunner')
words_list = ['aa', 'bb', 'cc']
- words = pipeline | Create('SomeWords', words_list)
- prefix = pipeline | Create('SomeString', ['xyz']) # side in
+ words = pipeline | 'SomeWords' >> Create(words_list)
+ prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in
suffix = 'zyx'
result = words | FlatMap(
'DecorateWords',
@@ -92,9 +92,9 @@ class DataflowTest(unittest.TestCase):
def test_par_do_with_side_input_as_keyword_arg(self):
pipeline = Pipeline('DirectPipelineRunner')
words_list = ['aa', 'bb', 'cc']
- words = pipeline | Create('SomeWords', words_list)
+ words = pipeline | 'SomeWords' >> Create(words_list)
prefix = 'zyx'
- suffix = pipeline | Create('SomeString', ['xyz']) # side in
+ suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in
result = words | FlatMap(
'DecorateWords',
lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
@@ -111,10 +111,10 @@ class DataflowTest(unittest.TestCase):
pipeline = Pipeline('DirectPipelineRunner')
words_list = ['aa', 'bb', 'cc']
- words = pipeline | Create('SomeWords', words_list)
+ words = pipeline | 'SomeWords' >> Create(words_list)
prefix = 'zyx'
- suffix = pipeline | Create('SomeString', ['xyz']) # side in
- result = words | ParDo('DecorateWordsDoFn', SomeDoFn(), prefix,
+ suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in
+ result = words | 'DecorateWordsDoFn' >> ParDo(SomeDoFn(), prefix,
suffix=AsSingleton(suffix))
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
pipeline.run()
@@ -131,7 +131,7 @@ class DataflowTest(unittest.TestCase):
yield SideOutputValue('odd', context.element)
pipeline = Pipeline('DirectPipelineRunner')
- nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+ nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
results = nums | ParDo(
'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
assert_that(results.main, equal_to([1, 2, 3, 4]))
@@ -147,7 +147,7 @@ class DataflowTest(unittest.TestCase):
return [v, SideOutputValue('odd', v)]
pipeline = Pipeline('DirectPipelineRunner')
- nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+ nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
results = nums | FlatMap(
'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
assert_that(results.main, equal_to([1, 2, 3, 4]))
@@ -157,37 +157,37 @@ class DataflowTest(unittest.TestCase):
def test_empty_singleton_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcol = pipeline | Create('start', [1, 2])
- side = pipeline | Create('side', []) # Empty side input.
+ pcol = pipeline | 'start' >> Create([1, 2])
+ side = pipeline | 'side' >> Create([]) # Empty side input.
def my_fn(k, s):
v = ('empty' if isinstance(s, EmptySideInput) else 'full')
return [(k, v)]
- result = pcol | FlatMap('compute', my_fn, AsSingleton(side))
+ result = pcol | 'compute' >> FlatMap(my_fn, AsSingleton(side))
assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
pipeline.run()
def test_multi_valued_singleton_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcol = pipeline | Create('start', [1, 2])
- side = pipeline | Create('side', [3, 4]) # 2 values in side input.
- pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned
+ pcol = pipeline | 'start' >> Create([1, 2])
+ side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input.
+ pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned
with self.assertRaises(ValueError):
pipeline.run()
def test_default_value_singleton_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcol = pipeline | Create('start', [1, 2])
- side = pipeline | Create('side', []) # 0 values in side input.
+ pcol = pipeline | 'start' >> Create([1, 2])
+ side = pipeline | 'side' >> Create([]) # 0 values in side input.
result = (
- pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side, 10)))
+ pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)))
assert_that(result, equal_to([10, 20]))
pipeline.run()
def test_iterable_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcol = pipeline | Create('start', [1, 2])
- side = pipeline | Create('side', [3, 4]) # 2 values in side input.
+ pcol = pipeline | 'start' >> Create([1, 2])
+ side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input.
result = pcol | FlatMap('compute',
lambda x, s: [x * y for y in s], AllOf(side))
assert_that(result, equal_to([3, 4, 6, 8]))
@@ -195,7 +195,7 @@ class DataflowTest(unittest.TestCase):
def test_undeclared_side_outputs(self):
pipeline = Pipeline('DirectPipelineRunner')
- nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+ nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
results = nums | FlatMap(
'ClassifyNumbers',
lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
@@ -210,7 +210,7 @@ class DataflowTest(unittest.TestCase):
def test_empty_side_outputs(self):
pipeline = Pipeline('DirectPipelineRunner')
- nums = pipeline | Create('Some Numbers', [1, 3, 5])
+ nums = pipeline | 'Some Numbers' >> Create([1, 3, 5])
results = nums | FlatMap(
'ClassifyNumbers',
lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
@@ -224,9 +224,9 @@ class DataflowTest(unittest.TestCase):
a_list = [5, 1, 3, 2, 9]
some_pairs = [('crouton', 17), ('supreme', None)]
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
- side_pairs = pipeline | Create('side pairs', some_pairs)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_list = pipeline | 'side list' >> Create(a_list)
+ side_pairs = pipeline | 'side pairs' >> Create(some_pairs)
results = main_input | FlatMap(
'concatenate',
lambda x, the_list, the_dict: [[x, the_list, the_dict]],
@@ -248,8 +248,8 @@ class DataflowTest(unittest.TestCase):
# with the same defaults will return the same PCollectionView.
a_list = [2]
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
'test',
lambda x, s1, s2: [[x, s1, s2]],
@@ -271,8 +271,8 @@ class DataflowTest(unittest.TestCase):
# distinct PCollectionViews with the same full_label.
a_list = [2]
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_list = pipeline | 'side list' >> Create(a_list)
with self.assertRaises(RuntimeError) as e:
_ = main_input | FlatMap(
@@ -287,8 +287,8 @@ class DataflowTest(unittest.TestCase):
def test_as_singleton_with_different_defaults_with_unique_labels(self):
a_list = []
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
'test',
lambda x, s1, s2: [[x, s1, s2]],
@@ -311,8 +311,8 @@ class DataflowTest(unittest.TestCase):
# return the same PCollectionView.
a_list = [1, 2, 3]
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
'test',
lambda x, ls1, ls2: [[x, ls1, ls2]],
@@ -332,8 +332,8 @@ class DataflowTest(unittest.TestCase):
def test_as_list_with_unique_labels(self):
a_list = [1, 2, 3]
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
'test',
lambda x, ls1, ls2: [[x, ls1, ls2]],
@@ -353,8 +353,8 @@ class DataflowTest(unittest.TestCase):
def test_as_dict_with_unique_labels(self):
some_kvs = [('a', 1), ('b', 2)]
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_kvs = pipeline | Create('side kvs', some_kvs)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_kvs = pipeline | 'side kvs' >> Create(some_kvs)
results = main_input | FlatMap(
'test',
lambda x, dct1, dct2: [[x, dct1, dct2]],
@@ -383,10 +383,10 @@ class DataflowTest(unittest.TestCase):
return existing_windows
pipeline = Pipeline('DirectPipelineRunner')
- numbers = pipeline | Create('KVs', [(1, 10), (2, 20), (3, 30)])
+ numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)])
result = (numbers
- | WindowInto('W', windowfn=TestWindowFn())
- | GroupByKey('G'))
+ | 'W' >> WindowInto(windowfn=TestWindowFn())
+ | 'G' >> GroupByKey())
assert_that(
result, equal_to([(1, [10]), (1, [10]), (2, [20]),
(2, [20]), (3, [30]), (3, [30])]))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 0f1e96e..b68bc56 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -45,12 +45,12 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
- | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
- | beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
- | TopPerPrefix('TopPerPrefix', 5)
+ | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+ | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+ | 'TopPerPrefix' >> TopPerPrefix(5)
| beam.Map('format',
lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
- | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+ | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 84f947b..18d0511 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -31,8 +31,8 @@ class AutocompleteTest(unittest.TestCase):
def test_top_prefixes(self):
p = beam.Pipeline('DirectPipelineRunner')
- words = p | beam.Create('create', self.WORDS)
- result = words | autocomplete.TopPerPrefix('test', 5)
+ words = p | 'create' >> beam.Create(self.WORDS)
+ result = words | 'test' >> autocomplete.TopPerPrefix(5)
# values must be hashable for now
result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
assert_that(result, equal_to(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 09faecf..ef9f8cc 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -96,9 +96,9 @@ class EstimatePiTransform(beam.PTransform):
def apply(self, pcoll):
# A hundred work items of a hundred thousand tries each.
return (pcoll
- | beam.Create('Initialize', [100000] * 100).with_output_types(int)
- | beam.Map('Run trials', run_trials)
- | beam.CombineGlobally('Sum', combine_results).without_defaults())
+ | 'Initialize' >> beam.Create([100000] * 100).with_output_types(int)
+ | 'Run trials' >> beam.Map(run_trials)
+ | 'Sum' >> beam.CombineGlobally(combine_results).without_defaults())
def run(argv=None):
@@ -115,7 +115,7 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
- | EstimatePiTransform('Estimate')
+ | 'Estimate' >> EstimatePiTransform()
| beam.io.Write('Write',
beam.io.TextFileSink(known_args.output,
coder=JsonCoder())))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index c633bb1..3967ed5 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -39,7 +39,7 @@ class EstimatePiTest(unittest.TestCase):
def test_basics(self):
p = beam.Pipeline('DirectPipelineRunner')
- result = p | estimate_pi.EstimatePiTransform('Estimate')
+ result = p | 'Estimate' >> estimate_pi.EstimatePiTransform()
# Note: Probabilistically speaking this test can fail with a probability
# that is very small (VERY) given that we run at least 10 million trials.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 2bc37e6..56696c3 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -50,7 +50,7 @@ def generate_julia_set_colors(pipeline, c, n, max_iterations):
yield (x, y)
julia_set_colors = (pipeline
- | beam.Create('add points', point_set(n))
+ | 'add points' >> beam.Create(point_set(n))
| beam.Map(
get_julia_set_point_color, c, n, max_iterations))
@@ -105,11 +105,11 @@ def run(argv=None): # pylint: disable=missing-docstring
# Group each coordinate triplet by its x value, then write the coordinates to
# the output file with an x-coordinate grouping per line.
# pylint: disable=expression-not-assigned
- (coordinates | beam.Map('x coord key', lambda (x, y, i): (x, (x, y, i)))
- | beam.GroupByKey('x coord') | beam.Map(
+ (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+ | 'x coord' >> beam.GroupByKey() | beam.Map(
'format',
lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
- | beam.io.Write('write', beam.io.TextFileSink(known_args.coordinate_output)))
+ | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
# pylint: enable=expression-not-assigned
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index ef58cc0..043d5f6 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -42,7 +42,7 @@ def read_documents(pipeline, uris):
pipeline
| beam.io.Read('read: %s' % uri, beam.io.TextFileSource(uri))
| beam.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri))
- return pcolls | beam.Flatten('flatten read pcolls')
+ return pcolls | 'flatten read pcolls' >> beam.Flatten()
class TfIdf(beam.PTransform):
@@ -59,9 +59,9 @@ class TfIdf(beam.PTransform):
# PCollection to use as side input.
total_documents = (
uri_to_content
- | beam.Keys('get uris')
- | beam.RemoveDuplicates('get unique uris')
- | beam.combiners.Count.Globally(' count uris'))
+ | 'get uris' >> beam.Keys()
+ | 'get unique uris' >> beam.RemoveDuplicates()
+ | ' count uris' >> beam.combiners.Count.Globally())
# Create a collection of pairs mapping a URI to each of the words
# in the document associated with that that URI.
@@ -71,28 +71,28 @@ class TfIdf(beam.PTransform):
uri_to_words = (
uri_to_content
- | beam.FlatMap('split words', split_into_words))
+ | 'split words' >> beam.FlatMap(split_into_words))
# Compute a mapping from each word to the total number of documents
# in which it appears.
word_to_doc_count = (
uri_to_words
- | beam.RemoveDuplicates('get unique words per doc')
- | beam.Values('get words')
- | beam.combiners.Count.PerElement('count docs per word'))
+ | 'get unique words per doc' >> beam.RemoveDuplicates()
+ | 'get words' >> beam.Values()
+ | 'count docs per word' >> beam.combiners.Count.PerElement())
# Compute a mapping from each URI to the total number of words in the
# document associated with that URI.
uri_to_word_total = (
uri_to_words
- | beam.Keys(' get uris')
- | beam.combiners.Count.PerElement('count words in doc'))
+ | ' get uris' >> beam.Keys()
+ | 'count words in doc' >> beam.combiners.Count.PerElement())
# Count, for each (URI, word) pair, the number of occurrences of that word
# in the document associated with the URI.
uri_and_word_to_count = (
uri_to_words
- | beam.combiners.Count.PerElement('count word-doc pairs'))
+ | 'count word-doc pairs' >> beam.combiners.Count.PerElement())
# Adjust the above collection to a mapping from (URI, word) pairs to counts
# into an isomorphic mapping from URI to (word, count) pairs, to prepare
@@ -116,7 +116,7 @@ class TfIdf(beam.PTransform):
# ... ]}
uri_to_word_and_count_and_total = (
{'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count}
- | beam.CoGroupByKey('cogroup by uri'))
+ | 'cogroup by uri' >> beam.CoGroupByKey())
# Compute a mapping from each word to a (URI, term frequency) pair for each
# URI. A word's term frequency for a document is simply the number of times
@@ -132,7 +132,7 @@ class TfIdf(beam.PTransform):
word_to_uri_and_tf = (
uri_to_word_and_count_and_total
- | beam.FlatMap('compute term frequencies', compute_term_frequency))
+ | 'compute term frequencies' >> beam.FlatMap(compute_term_frequency))
# Compute a mapping from each word to its document frequency.
# A word's document frequency in a corpus is the number of
@@ -155,7 +155,7 @@ class TfIdf(beam.PTransform):
# each keyed on the word.
word_to_uri_and_tf_and_df = (
{'tf': word_to_uri_and_tf, 'df': word_to_df}
- | beam.CoGroupByKey('cogroup words by tf-df'))
+ | 'cogroup words by tf-df' >> beam.CoGroupByKey())
# Compute a mapping from each word to a (URI, TF-IDF) score for each URI.
# There are a variety of definitions of TF-IDF
@@ -170,7 +170,7 @@ class TfIdf(beam.PTransform):
word_to_uri_and_tfidf = (
word_to_uri_and_tf_and_df
- | beam.FlatMap('compute tf-idf', compute_tf_idf))
+ | 'compute tf-idf' >> beam.FlatMap(compute_tf_idf))
return word_to_uri_and_tfidf
@@ -197,7 +197,7 @@ def run(argv=None):
output = pcoll | TfIdf()
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
- output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+ output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 8f52611..ee7e534 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -56,7 +56,7 @@ class TfIdfTest(unittest.TestCase):
result = (
uri_to_line
| tfidf.TfIdf()
- | beam.Map('flatten', lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+ | 'flatten' >> beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS))
# Run the pipeline. Note that the assert_that above adds to the pipeline
# a check that the result PCollection contains expected values. To actually
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index c46bfc5..7468484 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -134,9 +134,9 @@ class ComputeTopSessions(beam.PTransform):
| beam.Filter(lambda x: (abs(hash(x)) <=
MAX_TIMESTAMP * self.sampling_threshold))
| ComputeSessions()
- | beam.ParDo('SessionsToStrings', SessionsToStringsDoFn())
+ | 'SessionsToStrings' >> beam.ParDo(SessionsToStringsDoFn())
| TopPerMonth()
- | beam.ParDo('FormatOutput', FormatOutputDoFn()))
+ | 'FormatOutput' >> beam.ParDo(FormatOutputDoFn()))
def run(argv=None):
@@ -168,9 +168,9 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
- | beam.Read('read', beam.io.TextFileSource(known_args.input))
+ | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
| ComputeTopSessions(known_args.sampling_threshold)
- | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+ | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index fb48641..207d6c4 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -50,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
def test_compute_top_sessions(self):
p = beam.Pipeline('DirectPipelineRunner')
- edits = p | beam.Create('create', self.EDITS)
+ edits = p | 'create' >> beam.Create(self.EDITS)
result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
beam.assert_that(result, beam.equal_to(self.EXPECTED))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
index 7c420fb..650a886 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
@@ -111,8 +111,8 @@ def run(argv=None):
}
# pylint: disable=expression-not-assigned
- record_ids = p | beam.Create('CreateIDs', ['1', '2', '3', '4', '5'])
- records = record_ids | beam.Map('CreateRecords', create_random_record)
+ record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
+ records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
records | beam.io.Write(
'write',
beam.io.BigQuerySink(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index e1d9cf1..1db4a1e 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -105,9 +105,9 @@ def run(argv=None):
beam.io.BigQuerySource(query=query_corpus))
pcoll_word = p | beam.Read('read words',
beam.io.BigQuerySource(query=query_word))
- pcoll_ignore_corpus = p | beam.Create('create_ignore_corpus', [ignore_corpus])
- pcoll_ignore_word = p | beam.Create('create_ignore_word', [ignore_word])
- pcoll_group_ids = p | beam.Create('create groups', group_ids)
+ pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create([ignore_corpus])
+ pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word])
+ pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids)
pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word,
pcoll_ignore_corpus, pcoll_ignore_word)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index bc75c41..215aafa 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -29,16 +29,16 @@ class BigQuerySideInputTest(unittest.TestCase):
def test_create_groups(self):
p = beam.Pipeline('DirectPipelineRunner')
- group_ids_pcoll = p | beam.Create('create_group_ids', ['A', 'B', 'C'])
+ group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C'])
corpus_pcoll = p | beam.Create('create_corpus',
[{'f': 'corpus1'},
{'f': 'corpus2'},
{'f': 'corpus3'}])
- words_pcoll = p | beam.Create('create_words', [{'f': 'word1'},
+ words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'},
{'f': 'word2'},
{'f': 'word3'}])
- ignore_corpus_pcoll = p | beam.Create('create_ignore_corpus', ['corpus1'])
- ignore_word_pcoll = p | beam.Create('create_ignore_word', ['word1'])
+ ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1'])
+ ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1'])
groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll,
words_pcoll, ignore_corpus_pcoll,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index e732309..cdaee36 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -56,8 +56,8 @@ def count_tornadoes(input_data):
| beam.FlatMap(
'months with tornadoes',
lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
- | beam.CombinePerKey('monthly count', sum)
- | beam.Map('format', lambda (k, v): {'month': k, 'tornado_count': v}))
+ | 'monthly count' >> beam.CombinePerKey(sum)
+ | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v}))
def run(argv=None):
@@ -77,7 +77,7 @@ def run(argv=None):
p = beam.Pipeline(argv=pipeline_args)
# Read the table rows into a PCollection.
- rows = p | beam.io.Read('read', beam.io.BigQuerySource(known_args.input))
+ rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
counts = count_tornadoes(rows)
# Write the output using a "Write" transform that has side effects.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index 2547849..87e1f44 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -28,7 +28,7 @@ class BigQueryTornadoesTest(unittest.TestCase):
def test_basics(self):
p = beam.Pipeline('DirectPipelineRunner')
- rows = (p | beam.Create('create', [
+ rows = (p | 'create' >> beam.Create([
{'month': 1, 'day': 1, 'tornado': False},
{'month': 1, 'day': 2, 'tornado': True},
{'month': 1, 'day': 3, 'tornado': True},
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index cde00b3..c29a038 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -59,30 +59,30 @@ def run(argv=None):
# Count the occurrences of each word.
output = (lines
- | beam.Map('split', lambda x: (x[:10], x[10:99])
+ | 'split' >> beam.Map(lambda x: (x[:10], x[10:99])
).with_output_types(beam.typehints.KV[str, str])
- | beam.GroupByKey('group')
+ | 'group' >> beam.GroupByKey()
| beam.FlatMap(
'format',
lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
# Write the output using a "Write" transform that has side effects.
- output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+ output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
# Optionally write the input and output checksums.
if known_args.checksum_output:
input_csum = (lines
- | beam.Map('input-csum', crc32line)
- | beam.CombineGlobally('combine-input-csum', sum)
- | beam.Map('hex-format', lambda x: '%x' % x))
+ | 'input-csum' >> beam.Map(crc32line)
+ | 'combine-input-csum' >> beam.CombineGlobally(sum)
+ | 'hex-format' >> beam.Map(lambda x: '%x' % x))
input_csum | beam.io.Write(
'write-input-csum',
beam.io.TextFileSink(known_args.checksum_output + '-input'))
output_csum = (output
- | beam.Map('output-csum', crc32line)
- | beam.CombineGlobally('combine-output-csum', sum)
- | beam.Map('hex-format-output', lambda x: '%x' % x))
+ | 'output-csum' >> beam.Map(crc32line)
+ | 'combine-output-csum' >> beam.CombineGlobally(sum)
+ | 'hex-format-output' >> beam.Map(lambda x: '%x' % x))
output_csum | beam.io.Write(
'write-output-csum',
beam.io.TextFileSink(known_args.checksum_output + '-output'))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index 1ce1fa5..bbe02b3 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -89,7 +89,7 @@ def run(argv=None):
(p # pylint: disable=expression-not-assigned
| beam.io.Read('read',
beam.io.TextFileSource(known_args.input, coder=JsonCoder()))
- | beam.FlatMap('points', compute_points)
+ | 'points' >> beam.FlatMap(compute_points)
| beam.CombinePerKey(sum)
| beam.io.Write('write',
beam.io.TextFileSink(known_args.output, coder=JsonCoder())))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index 5840081..75b78c8 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -35,9 +35,9 @@ class CodersTest(unittest.TestCase):
def test_compute_points(self):
p = beam.Pipeline('DirectPipelineRunner')
- records = p | beam.Create('create', self.SAMPLE_RECORDS)
+ records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS)
result = (records
- | beam.FlatMap('points', coders.compute_points)
+ | 'points' >> beam.FlatMap(coders.compute_points)
| beam.CombinePerKey(sum))
assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)]))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index d3d8b08..021eff6 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -39,7 +39,7 @@ class Count1(beam.PTransform):
def apply(self, pcoll):
return (
pcoll
- | beam.Map('Init', lambda v: (v, 1))
+ | 'Init' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
@@ -57,7 +57,7 @@ def Count2(pcoll): # pylint: disable=invalid-name
"""Count as a decorated function."""
return (
pcoll
- | beam.Map('Init', lambda v: (v, 1))
+ | 'Init' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
@@ -84,7 +84,7 @@ def Count3(pcoll, factor=1): # pylint: disable=invalid-name
"""
return (
pcoll
- | beam.Map('Init', lambda v: (v, factor))
+ | 'Init' >> beam.Map(lambda v: (v, factor))
| beam.CombinePerKey(sum))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 3c0c6f3..603742f 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -40,7 +40,7 @@ class CustomCountTest(unittest.TestCase):
def run_pipeline(self, count_implementation, factor=1):
p = beam.Pipeline('DirectPipelineRunner')
- words = p | beam.Create('create', ['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
+ words = p | 'create' >> beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
result = words | count_implementation
assert_that(
result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index c309941..b19b566 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -59,14 +59,14 @@ def filter_cold_days(input_data, month_filter):
# Compute the global mean temperature.
global_mean = AsSingleton(
fields_of_interest
- | beam.Map('extract mean', lambda row: row['mean_temp'])
- | beam.combiners.Mean.Globally('global mean'))
+ | 'extract mean' >> beam.Map(lambda row: row['mean_temp'])
+ | 'global mean' >> beam.combiners.Mean.Globally())
# Filter to the rows representing days in the month of interest
# in which the mean daily temperature is below the global mean.
return (
fields_of_interest
- | beam.Filter('desired month', lambda row: row['month'] == month_filter)
+ | 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter)
| beam.Filter('below mean',
lambda row, mean: row['mean_temp'] < mean, global_mean))
@@ -88,11 +88,11 @@ def run(argv=None):
p = beam.Pipeline(argv=pipeline_args)
- input_data = p | beam.Read('input', beam.io.BigQuerySource(known_args.input))
+ input_data = p | 'input' >> beam.Read(beam.io.BigQuerySource(known_args.input))
# pylint: disable=expression-not-assigned
(filter_cold_days(input_data, known_args.month_filter)
- | beam.io.Write('save to BQ', beam.io.BigQuerySink(
+ | 'save to BQ' >> beam.io.Write(beam.io.BigQuerySink(
known_args.output,
schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index cf1ca7e..9e5592f 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -36,7 +36,7 @@ class FiltersTest(unittest.TestCase):
def _get_result_for_month(self, month):
p = beam.Pipeline('DirectPipelineRunner')
- rows = (p | beam.Create('create', self.input_data))
+ rows = (p | 'create' >> beam.Create(self.input_data))
results = filters.filter_cold_days(rows, month)
return results
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 140314e..6c86f61 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -98,19 +98,19 @@ def run(argv=sys.argv[1:]):
coders.registry.register_coder(Player, PlayerCoder)
(p # pylint: disable=expression-not-assigned
- | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+ | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
# The get_players function is annotated with a type hint above, so the type
# system knows the output type of the following operation is a key-value pair
# of a Player and an int. Please see the documentation for details on
# types that are inferred automatically as well as other ways to specify
# type hints.
- | beam.Map('get players', get_players)
+ | 'get players' >> beam.Map(get_players)
# The output type hint of the previous step is used to infer that the key
# type of the following operation is the Player type. Since a custom coder
# is registered for the Player class above, a PlayerCoder will be used to
# encode Player objects as keys for this combine operation.
| beam.CombinePerKey(sum) | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
- | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+ | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 9e6b001..bf6d1b1 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -88,7 +88,7 @@ def run(argv=None, assert_results=None):
known_args.input_snailmail))
# Group together all entries under the same name.
- grouped = (email, phone, snailmail) | beam.CoGroupByKey('group_by_name')
+ grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey()
# Prepare tab-delimited output; something like this:
# "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
@@ -107,9 +107,9 @@ def run(argv=None, assert_results=None):
nomads = grouped | beam.Filter( # People without addresses.
lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None))
- num_luddites = luddites | beam.combiners.Count.Globally('luddites')
- num_writers = writers | beam.combiners.Count.Globally('writers')
- num_nomads = nomads | beam.combiners.Count.Globally('nomads')
+ num_luddites = luddites | 'luddites' >> beam.combiners.Count.Globally()
+ num_writers = writers | 'writers' >> beam.combiners.Count.Globally()
+ num_nomads = nomads | 'nomads' >> beam.combiners.Count.Globally()
# Write tab-delimited output.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 5bde591..187d20b 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -114,10 +114,10 @@ class CountWords(beam.PTransform):
def apply(self, pcoll):
return (pcoll
- | beam.Map('pair_with_one', lambda x: (x, 1))
- | beam.GroupByKey('group')
- | beam.Map('count', lambda (word, ones): (word, sum(ones)))
- | beam.Map('format', lambda (word, c): '%s: %s' % (word, c)))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+ | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)))
def run(argv=None):
@@ -137,7 +137,7 @@ def run(argv=None):
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
- lines = p | beam.Read('read', beam.io.TextFileSource(known_args.input))
+ lines = p | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
# with_outputs allows accessing the side outputs of a DoFn.
split_lines_result = (lines
@@ -155,21 +155,21 @@ def run(argv=None):
# pylint: disable=expression-not-assigned
(character_count
- | beam.Map('pair_with_key', lambda x: ('chars_temp_key', x))
+ | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
| beam.GroupByKey()
- | beam.Map('count chars', lambda (_, counts): sum(counts))
+ | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
| beam.Write('write chars',
beam.io.TextFileSink(known_args.output + '-chars')))
# pylint: disable=expression-not-assigned
(short_words
- | CountWords('count short words')
+ | 'count short words' >> CountWords()
| beam.Write('write short words',
beam.io.TextFileSink(known_args.output + '-short-words')))
# pylint: disable=expression-not-assigned
(words
- | CountWords('count words')
+ | 'count words' >> CountWords()
| beam.Write('write words',
beam.io.TextFileSink(known_args.output + '-words')))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 3658619..c605db8 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -104,7 +104,7 @@ def construct_pipeline(renames):
# [END pipelines_constructing_applying]
# [START pipelines_constructing_writing]
- filtered_words = reversed_words | beam.Filter('FilterWords', filter_words)
+ filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words)
filtered_words | beam.io.Write('WriteMyFile',
beam.io.TextFileSink(
'gs://some/outputData.txt'))
@@ -242,8 +242,8 @@ def pipeline_options_remote(argv):
options.view_as(StandardOptions).runner = 'DirectPipelineRunner'
p = Pipeline(options=options)
- lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(my_input))
- lines | beam.io.Write('WriteToText', beam.io.TextFileSink(my_output))
+ lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
+ lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
p.run()
@@ -283,8 +283,8 @@ def pipeline_options_local(argv):
p = Pipeline(options=options)
# [END pipeline_options_local]
- lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(my_input))
- lines | beam.io.Write('WriteToText', beam.io.TextFileSink(my_output))
+ lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
+ lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
p.run()
@@ -307,7 +307,7 @@ def pipeline_options_command_line(argv):
p = beam.Pipeline(argv=pipeline_args)
lines = p | beam.io.Read('ReadFromText',
beam.io.TextFileSource(known_args.input))
- lines | beam.io.Write('WriteToText', beam.io.TextFileSink(known_args.output))
+ lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
# [END pipeline_options_command_line]
p.run()
@@ -344,8 +344,8 @@ def pipeline_logging(lines, output):
p = beam.Pipeline(options=PipelineOptions())
(p
| beam.Create(lines)
- | beam.ParDo('ExtractWords', ExtractWordsFn())
- | beam.io.Write('WriteToText', beam.io.TextFileSink(output)))
+ | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
+ | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(output)))
p.run()
@@ -391,11 +391,11 @@ def pipeline_monitoring(renames):
def apply(self, pcoll):
return (pcoll
# Convert lines of text into individual words.
- | beam.ParDo('ExtractWords', ExtractWordsFn())
+ | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
# Count the number of times each word occurs.
| beam.combiners.Count.PerElement()
# Format each word and count into a printable string.
- | beam.ParDo('FormatCounts', FormatCountsFn()))
+ | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))
# [END pipeline_monitoring_composite]
pipeline_options = PipelineOptions()
@@ -405,11 +405,11 @@ def pipeline_monitoring(renames):
# [START pipeline_monitoring_execution]
(p
# Read the lines of the input text.
- | beam.io.Read('ReadLines', beam.io.TextFileSource(options.input))
+ | 'ReadLines' >> beam.io.Read(beam.io.TextFileSource(options.input))
# Count the words.
| CountWords()
# Write the formatted word counts to output.
- | beam.io.Write('WriteCounts', beam.io.TextFileSink(options.output)))
+ | 'WriteCounts' >> beam.io.Write(beam.io.TextFileSink(options.output)))
# [END pipeline_monitoring_execution]
p.visit(SnippetUtils.RenameFiles(renames))
@@ -454,7 +454,7 @@ def examples_wordcount_minimal(renames):
# [END examples_wordcount_minimal_read]
# [START examples_wordcount_minimal_pardo]
- | beam.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+ | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
# [END examples_wordcount_minimal_pardo]
# [START examples_wordcount_minimal_count]
@@ -466,7 +466,7 @@ def examples_wordcount_minimal(renames):
# [END examples_wordcount_minimal_map]
# [START examples_wordcount_minimal_write]
- | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+ | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
# [END examples_wordcount_minimal_write]
)
@@ -531,7 +531,7 @@ def examples_wordcount_wordcount(renames):
formatted = counts | beam.ParDo(FormatAsTextFn())
# [END examples_wordcount_wordcount_dofn]
- formatted | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+ formatted | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
p.visit(SnippetUtils.RenameFiles(renames))
p.run()
@@ -591,9 +591,9 @@ def examples_wordcount_debugging(renames):
p
| beam.io.Read(beam.io.TextFileSource(
'gs://dataflow-samples/shakespeare/kinglear.txt'))
- | beam.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+ | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| beam.combiners.Count.PerElement()
- | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
+ | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
# [START example_wordcount_debugging_assert]
beam.assert_that(
@@ -601,7 +601,7 @@ def examples_wordcount_debugging(renames):
# [END example_wordcount_debugging_assert]
output = (filtered_words
- | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
+ | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
| beam.io.Write(
'write', beam.io.TextFileSink('gs://my-bucket/counts.txt')))
@@ -682,7 +682,7 @@ def model_custom_source(count):
# Using the source in an example pipeline.
# [START model_custom_source_use_new_source]
p = beam.Pipeline(options=PipelineOptions())
- numbers = p | beam.io.Read('ProduceNumbers', CountingSource(count))
+ numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))
# [END model_custom_source_use_new_source]
lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
@@ -712,7 +712,7 @@ def model_custom_source(count):
# [START model_custom_source_use_ptransform]
p = beam.Pipeline(options=PipelineOptions())
- numbers = p | ReadFromCountingSource('ProduceNumbers', count)
+ numbers = p | 'ProduceNumbers' >> ReadFromCountingSource(count)
# [END model_custom_source_use_ptransform]
lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
@@ -848,7 +848,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
# [START model_custom_sink_use_ptransform]
p = beam.Pipeline(options=PipelineOptions())
- kvs = p | beam.core.Create('CreateKVs', KVs)
+ kvs = p | 'CreateKVs' >> beam.core.Create(KVs)
kvs | WriteToKVSink('WriteToSimpleKV',
'http://url_to_simple_kv/', final_table_name)
# [END model_custom_sink_use_ptransform]
@@ -880,7 +880,7 @@ def model_textio(renames):
# [END model_textio_read]
# [START model_textio_write]
- filtered_words = lines | beam.FlatMap('FilterWords', filter_words)
+ filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words)
# [START model_pipelineio_write]
filtered_words | beam.io.Write(
'WriteToText', beam.io.TextFileSink('gs://my_bucket/path/to/numbers',
@@ -1053,7 +1053,7 @@ def model_group_by_key(contents, output_path):
p
| beam.Create(contents)
| beam.FlatMap(lambda x: re.findall(r'\w+', x))
- | beam.Map('one word', lambda w: (w, 1)))
+ | 'one word' >> beam.Map(lambda w: (w, 1)))
# GroupByKey accepts a PCollection of (w, 1) and
# outputs a PCollection of (w, (1, 1, ...)).
# (A key/value pair is just a tuple in Python.)
@@ -1063,7 +1063,7 @@ def model_group_by_key(contents, output_path):
grouped_words = words_and_counts | beam.GroupByKey()
# [END model_group_by_key_transform]
(grouped_words
- | beam.Map('count words', lambda (word, counts): (word, len(counts)))
+ | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))
| beam.io.Write(beam.io.TextFileSink(output_path)))
p.run()
@@ -1083,8 +1083,8 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
# multiple possible values for each key.
# The phone_list contains values such as: ('mary': '111-222-3333') with
# multiple possible values for each key.
- emails = p | beam.Create('email', email_list)
- phones = p | beam.Create('phone', phone_list)
+ emails = p | 'email' >> beam.Create(email_list)
+ phones = p | 'phone' >> beam.Create(phone_list)
# The result PCollection contains one key-value element for each key in the
# input PCollections. The key of the pair will be the key from the input and
# the value will be a dictionary with two entries: 'emails' - an iterable of
@@ -1119,9 +1119,9 @@ def model_join_using_side_inputs(
# This code performs a join by receiving the set of names as an input and
# passing PCollections that contain emails and phone numbers as side inputs
# instead of using CoGroupByKey.
- names = p | beam.Create('names', name_list)
- emails = p | beam.Create('email', email_list)
- phones = p | beam.Create('phone', phone_list)
+ names = p | 'names' >> beam.Create(name_list)
+ emails = p | 'email' >> beam.Create(email_list)
+ phones = p | 'phone' >> beam.Create(phone_list)
def join_info(name, emails, phone_numbers):
filtered_emails = []
@@ -1149,7 +1149,7 @@ def model_join_using_side_inputs(
class Keys(beam.PTransform):
def apply(self, pcoll):
- return pcoll | beam.Map('Keys', lambda (k, v): k)
+ return pcoll | 'Keys' >> beam.Map(lambda (k, v): k)
# [END model_library_transforms_keys]
# pylint: enable=invalid-name
@@ -1160,6 +1160,6 @@ class Count(beam.PTransform):
def apply(self, pcoll):
return (
pcoll
- | beam.Map('Init', lambda v: (v, 1))
+ | 'Init' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
# [END model_library_transforms_count]
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 7888263..9eba46a 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -103,14 +103,14 @@ class ParDoTest(unittest.TestCase):
def test_pardo_with_label(self):
words = ['aa', 'bbc', 'defg']
# [START model_pardo_with_label]
- result = words | beam.Map('CountUniqueLetters', lambda word: len(set(word)))
+ result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word)))
# [END model_pardo_with_label]
self.assertEqual({1, 2, 4}, set(result))
def test_pardo_side_input(self):
p = beam.Pipeline('DirectPipelineRunner')
- words = p | beam.Create('start', ['a', 'bb', 'ccc', 'dddd'])
+ words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
# [START model_pardo_side_input]
# Callable takes additional arguments.
@@ -124,11 +124,11 @@ class ParDoTest(unittest.TestCase):
| beam.CombineGlobally(beam.combiners.MeanCombineFn()))
# Call with explicit side inputs.
- small_words = words | beam.FlatMap('small', filter_using_length, 0, 3)
+ small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
# A single deferred side input.
larger_than_average = (words
- | beam.FlatMap('large', filter_using_length,
+ | 'large' >> beam.FlatMap(filter_using_length,
lower_bound=pvalue.AsSingleton(
avg_word_len)))
@@ -268,7 +268,7 @@ class TypeHintsTest(unittest.TestCase):
evens = numbers | beam.ParDo(FilterEvensDoFn())
# [END type_hints_do_fn]
- words = p | beam.Create('words', ['a', 'bb', 'c'])
+ words = p | 'words' >> beam.Create(['a', 'bb', 'c'])
# One can assert outputs and apply them to transforms as well.
# Helps document the contract and checks it at pipeline construction time.
# [START type_hints_transform]
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index 7148e58..ef95a5f 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -49,7 +49,7 @@ def run(argv=None):
# Capitalize the characters in each line.
transformed = (lines
- | (beam.Map('capitalize', lambda x: x.upper())))
+ | 'capitalize' >> (beam.Map(lambda x: x.upper())))
# Write to PubSub.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index eda74dd..35c1abb 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -55,11 +55,11 @@ def run(argv=None):
| (beam.FlatMap('split',
lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
- | beam.Map('pair_with_one', lambda x: (x, 1))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(15, 0))
- | beam.GroupByKey('group')
- | beam.Map('count', lambda (word, ones): (word, sum(ones)))
- | beam.Map('format', lambda tup: '%s: %d' % tup))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+ | 'format' >> beam.Map(lambda tup: '%s: %d' % tup))
# Write to PubSub.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index bbfd43e..4744352 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -77,22 +77,22 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
- lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+ lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
# Count the occurrences of each word.
counts = (lines
- | (beam.ParDo('split', WordExtractingDoFn())
+ | 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
- | beam.Map('pair_with_one', lambda x: (x, 1))
- | beam.GroupByKey('group')
- | beam.Map('count', lambda (word, ones): (word, sum(ones))))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
# Format the counts into a PCollection of strings.
- output = counts | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
+ output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
- output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+ output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
# Actually run the pipeline (all operations above are deferred).
result = p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 74effed..e008b48 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -97,11 +97,11 @@ class CountWords(beam.PTransform):
def apply(self, pcoll):
return (pcoll
- | (beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+ | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
- | beam.Map('pair_with_one', lambda x: (x, 1))
- | beam.GroupByKey('group')
- | beam.Map('count', lambda (word, ones): (word, sum(ones))))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
def run(argv=None):
@@ -126,9 +126,9 @@ def run(argv=None):
# Read the text file[pattern] into a PCollection, count the occurrences of
# each word and filter by a list of words.
filtered_words = (
- p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+ p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
| CountWords()
- | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
+ | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
# assert_that is a convenient PTransform that checks a PCollection has an
# expected value. Asserts are best used in unit tests with small data sets but
@@ -145,8 +145,8 @@ def run(argv=None):
# "Write" transform that has side effects.
# pylint: disable=unused-variable
output = (filtered_words
- | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
- | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+ | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+ | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
# Actually run the pipeline (all operations above are deferred).
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index c3c41d7..ce5b644 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -93,22 +93,22 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
- lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+ lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
# Count the occurrences of each word.
counts = (lines
- | (beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+ | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
- | beam.Map('pair_with_one', lambda x: (x, 1))
- | beam.GroupByKey('group')
- | beam.Map('count', lambda (word, ones): (word, sum(ones))))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
# Format the counts into a PCollection of strings.
- output = counts | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
+ output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
- output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+ output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
# Actually run the pipeline (all operations above are deferred).
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 4d1b245..7ad3842 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -43,7 +43,7 @@ class ReadFromAvro(PTransform):
files, a ``PCollection`` for the records in these Avro files can be created
in the following manner.
p = df.Pipeline(argv=pipeline_args)
- records = p | df.io.ReadFromAvro('Read', '/mypath/myavrofiles*')
+ records = p | 'Read' >> df.io.ReadFromAvro('/mypath/myavrofiles*')
Each record of this ``PCollection`` will contain a single record read from a
source. Records that are of simple types will be mapped into corresponding
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index f2c56dc..f789312 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -45,8 +45,8 @@ Map transform will get on each call *one* row of the main table and *all* rows
of the side table. The execution framework may use some caching techniques to
share the side inputs between calls in order to avoid excessive reading::
- main_table = pipeline | beam.io.Read(beam.io.BigQuerySource('very_big_table')
- side_table = pipeline | beam.io.Read(beam.io.BigQuerySource('not_big_table')
+ main_table = pipeline | 'very_big_table' >> beam.io.Read(beam.io.BigQuerySource()
+ side_table = pipeline | 'not_big_table' >> beam.io.Read(beam.io.BigQuerySource()
results = (
main_table
| beam.Map('process data',
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index c7837ec..1bf51b2 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -255,7 +255,7 @@ class TestFileBasedSource(unittest.TestCase):
file_name, expected_data = _write_data(100)
assert len(expected_data) == 100
pipeline = beam.Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Read('Read', LineSource(file_name))
+ pcoll = pipeline | 'Read' >> beam.Read(LineSource(file_name))
assert_that(pcoll, equal_to(expected_data))
pipeline.run()
@@ -263,7 +263,7 @@ class TestFileBasedSource(unittest.TestCase):
pattern, expected_data = _write_pattern([34, 66, 40, 24, 24, 12])
assert len(expected_data) == 200
pipeline = beam.Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Read('Read', LineSource(pattern))
+ pcoll = pipeline | 'Read' >> beam.Read(LineSource(pattern))
assert_that(pcoll, equal_to(expected_data))
pipeline.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index de5e9d4..b683eb2 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -990,7 +990,7 @@ class Write(ptransform.PTransform):
from apache_beam.io import iobase
if isinstance(self.sink, iobase.NativeSink):
# A native sink
- return pcoll | _NativeWrite('native_write', self.sink)
+ return pcoll | 'native_write' >> _NativeWrite(self.sink)
elif isinstance(self.sink, iobase.Sink):
# A custom sink
return pcoll | WriteImpl(self.sink)
@@ -1010,7 +1010,7 @@ class WriteImpl(ptransform.PTransform):
self.sink = sink
def apply(self, pcoll):
- do_once = pcoll.pipeline | core.Create('DoOnce', [None])
+ do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None])
init_result_coll = do_once | core.Map(
'initialize_write', lambda _, sink: sink.initialize_write(), self.sink)
if getattr(self.sink, 'num_shards', 0):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 86ae45f..39816c0 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -69,7 +69,7 @@ class PipelineTest(unittest.TestCase):
@staticmethod
def custom_callable(pcoll):
- return pcoll | FlatMap('+1', lambda x: [x + 1])
+ return pcoll | '+1' >> FlatMap(lambda x: [x + 1])
# Some of these tests designate a runner by name, others supply a runner.
# This variation is just to verify that both means of runner specification
@@ -78,7 +78,7 @@ class PipelineTest(unittest.TestCase):
class CustomTransform(PTransform):
def apply(self, pcoll):
- return pcoll | FlatMap('+1', lambda x: [x + 1])
+ return pcoll | '+1' >> FlatMap(lambda x: [x + 1])
class Visitor(PipelineVisitor):
@@ -98,33 +98,33 @@ class PipelineTest(unittest.TestCase):
def test_create(self):
pipeline = Pipeline(self.runner_name)
- pcoll = pipeline | Create('label1', [1, 2, 3])
+ pcoll = pipeline | 'label1' >> Create([1, 2, 3])
assert_that(pcoll, equal_to([1, 2, 3]))
# Test if initial value is an iterator object.
- pcoll2 = pipeline | Create('label2', iter((4, 5, 6)))
- pcoll3 = pcoll2 | FlatMap('do', lambda x: [x + 10])
+ pcoll2 = pipeline | 'label2' >> Create(iter((4, 5, 6)))
+ pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10])
assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3')
pipeline.run()
def test_create_singleton_pcollection(self):
pipeline = Pipeline(self.runner_name)
- pcoll = pipeline | Create('label', [[1, 2, 3]])
+ pcoll = pipeline | 'label' >> Create([[1, 2, 3]])
assert_that(pcoll, equal_to([[1, 2, 3]]))
pipeline.run()
def test_read(self):
pipeline = Pipeline(self.runner_name)
- pcoll = pipeline | Read('read', FakeSource([1, 2, 3]))
+ pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3]))
assert_that(pcoll, equal_to([1, 2, 3]))
pipeline.run()
def test_visit_entire_graph(self):
pipeline = Pipeline(self.runner_name)
- pcoll1 = pipeline | Create('pcoll', [1, 2, 3])
- pcoll2 = pcoll1 | FlatMap('do1', lambda x: [x + 1])
- pcoll3 = pcoll2 | FlatMap('do2', lambda x: [x + 1])
- pcoll4 = pcoll2 | FlatMap('do3', lambda x: [x + 1])
+ pcoll1 = pipeline | 'pcoll' >> Create([1, 2, 3])
+ pcoll2 = pcoll1 | 'do1' >> FlatMap(lambda x: [x + 1])
+ pcoll3 = pcoll2 | 'do2' >> FlatMap(lambda x: [x + 1])
+ pcoll4 = pcoll2 | 'do3' >> FlatMap(lambda x: [x + 1])
transform = PipelineTest.CustomTransform()
pcoll5 = pcoll4 | transform
@@ -140,15 +140,15 @@ class PipelineTest(unittest.TestCase):
def test_apply_custom_transform(self):
pipeline = Pipeline(self.runner_name)
- pcoll = pipeline | Create('pcoll', [1, 2, 3])
+ pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
result = pcoll | PipelineTest.CustomTransform()
assert_that(result, equal_to([2, 3, 4]))
pipeline.run()
def test_reuse_custom_transform_instance(self):
pipeline = Pipeline(self.runner_name)
- pcoll1 = pipeline | Create('pcoll1', [1, 2, 3])
- pcoll2 = pipeline | Create('pcoll2', [4, 5, 6])
+ pcoll1 = pipeline | 'pcoll1' >> Create([1, 2, 3])
+ pcoll2 = pipeline | 'pcoll2' >> Create([4, 5, 6])
transform = PipelineTest.CustomTransform()
pcoll1 | transform
with self.assertRaises(RuntimeError) as cm:
@@ -183,7 +183,7 @@ class PipelineTest(unittest.TestCase):
self.assertEqual(
['a-x', 'b-x', 'c-x'],
- sorted(['a', 'b', 'c'] | AddSuffix('-x')))
+ sorted(['a', 'b', 'c'] | '-x' >> AddSuffix()))
def test_cached_pvalues_are_refcounted(self):
"""Test that cached PValues are refcounted and deleted.
@@ -213,17 +213,17 @@ class PipelineTest(unittest.TestCase):
gc.collect()
count_threshold = len(gc.get_objects()) + 10000
- biglist = pipeline | Create('oom:create', ['x'] * num_elements)
+ biglist = pipeline | 'oom:create' >> Create(['x'] * num_elements)
dupes = (
biglist
- | Map('oom:addone', lambda x: (x, 1))
- | FlatMap('oom:dupes', create_dupes,
+ | 'oom:addone' >> Map(lambda x: (x, 1))
+ | 'oom:dupes' >> FlatMap(create_dupes,
AsIter(biglist)).with_outputs('side', main='main'))
result = (
(dupes.side, dupes.main, dupes.side)
- | Flatten('oom:flatten')
- | CombinePerKey('oom:combine', sum)
- | Map('oom:check', check_memory, count_threshold))
+ | 'oom:flatten' >> Flatten()
+ | 'oom:combine' >> CombinePerKey(sum)
+ | 'oom:check' >> Map(check_memory, count_threshold))
assert_that(result, equal_to([('x', 3 * num_elements)]))
pipeline.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py
index bb742e0..323ca33 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -45,9 +45,9 @@ class PValueTest(unittest.TestCase):
def test_pcollectionview_not_recreated(self):
pipeline = Pipeline('DirectPipelineRunner')
- value = pipeline | Create('create1', [1, 2, 3])
- value2 = pipeline | Create('create2', [(1, 1), (2, 2), (3, 3)])
- value3 = pipeline | Create('create3', [(1, 1), (2, 2), (3, 3)])
+ value = pipeline | 'create1' >> Create([1, 2, 3])
+ value2 = pipeline | 'create2' >> Create([(1, 1), (2, 2), (3, 3)])
+ value3 = pipeline | 'create3' >> Create([(1, 1), (2, 2), (3, 3)])
self.assertEqual(AsSingleton(value), AsSingleton(value))
self.assertEqual(AsSingleton('new', value, default_value=1),
AsSingleton('new', value, default_value=1))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
index b3b2968..3cd8d73 100644
--- a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
@@ -103,8 +103,8 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
pvalue.ListPCollectionView))
def test_co_group_by_key(self):
- emails = self.pipeline | Create('email', [('joe', 'joe@example.com')])
- phones = self.pipeline | Create('phone', [('mary', '111-222-3333')])
+ emails = self.pipeline | 'email' >> Create([('joe', 'joe@example.com')])
+ phones = self.pipeline | 'phone' >> Create([('mary', '111-222-3333')])
{'emails': emails, 'phones': phones} | CoGroupByKey()
self.pipeline.visit(self.visitor)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 2863756..04de7fb 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -60,9 +60,9 @@ class RunnerTest(unittest.TestCase):
'--no_auth=True'
]))
- (p | ptransform.Create('create', [1, 2, 3]) # pylint: disable=expression-not-assigned
- | ptransform.FlatMap('do', lambda x: [(x, x)])
- | ptransform.GroupByKey('gbk'))
+ (p | 'create' >> ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned
+ | 'do' >> ptransform.FlatMap(lambda x: [(x, x)])
+ | 'gbk' >> ptransform.GroupByKey())
remote_runner.job = apiclient.Job(p.options)
super(DataflowPipelineRunner, remote_runner).run(p)
[08/12] incubator-beam git commit: Fixes examples
Posted by ro...@apache.org.
Fixes examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ff5630b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ff5630b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ff5630b
Branch: refs/heads/python-sdk
Commit: 2ff5630be06034e25d60e105bbe1a718ae06b4d6
Parents: 362f2e9
Author: Chamikara Jayalath <ch...@google.com>
Authored: Fri Jul 22 16:04:29 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/examples/complete/autocomplete.py | 4 ++--
sdks/python/apache_beam/examples/complete/estimate_pi.py | 3 ---
sdks/python/apache_beam/examples/snippets/snippets.py | 8 ++++----
3 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ff5630b/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index b68bc56..10d9009 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -56,8 +56,8 @@ def run(argv=None):
class TopPerPrefix(beam.PTransform):
- def __init__(self, label, count):
- super(TopPerPrefix, self).__init__(label)
+ def __init__(self, count):
+ super(TopPerPrefix, self).__init__()
self._count = count
def apply(self, words):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ff5630b/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index ef9f8cc..c33db1d 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -90,9 +90,6 @@ class JsonCoder(object):
class EstimatePiTransform(beam.PTransform):
"""Runs 10M trials, and combine the results to estimate pi."""
- def __init__(self, label):
- super(EstimatePiTransform, self).__init__(label)
-
def apply(self, pcoll):
# A hundred work items of a hundred thousand tries each.
return (pcoll
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ff5630b/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index c605db8..9d1df82 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -466,7 +466,7 @@ def examples_wordcount_minimal(renames):
# [END examples_wordcount_minimal_map]
# [START examples_wordcount_minimal_write]
- | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
+ | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
# [END examples_wordcount_minimal_write]
)
@@ -531,7 +531,7 @@ def examples_wordcount_wordcount(renames):
formatted = counts | beam.ParDo(FormatAsTextFn())
# [END examples_wordcount_wordcount_dofn]
- formatted | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
+ formatted | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
p.visit(SnippetUtils.RenameFiles(renames))
p.run()
@@ -702,8 +702,8 @@ def model_custom_source(count):
# [START model_custom_source_new_ptransform]
class ReadFromCountingSource(PTransform):
- def __init__(self, label, count, **kwargs):
- super(ReadFromCountingSource, self).__init__(label, **kwargs)
+ def __init__(self, count, **kwargs):
+ super(ReadFromCountingSource, self).__init__(**kwargs)
self._count = count
def apply(self, pcoll):
[06/12] incubator-beam git commit: fix pipeline test
Posted by ro...@apache.org.
fix pipeline test
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/362f2e9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/362f2e9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/362f2e9e
Branch: refs/heads/python-sdk
Commit: 362f2e9e4398662d55a2a4e6399f43155c58ed24
Parents: 0183051
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jul 22 16:03:08 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline_test.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/362f2e9e/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 39816c0..327f26c 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -183,7 +183,7 @@ class PipelineTest(unittest.TestCase):
self.assertEqual(
['a-x', 'b-x', 'c-x'],
- sorted(['a', 'b', 'c'] | '-x' >> AddSuffix()))
+ sorted(['a', 'b', 'c'] | 'AddSuffix' >> AddSuffix('-x')))
def test_cached_pvalues_are_refcounted(self):
"""Test that cached PValues are refcounted and deleted.
@@ -218,7 +218,7 @@ class PipelineTest(unittest.TestCase):
biglist
| 'oom:addone' >> Map(lambda x: (x, 1))
| 'oom:dupes' >> FlatMap(create_dupes,
- AsIter(biglist)).with_outputs('side', main='main'))
+ AsIter(biglist)).with_outputs('side', main='main'))
result = (
(dupes.side, dupes.main, dupes.side)
| 'oom:flatten' >> Flatten()
@@ -232,8 +232,8 @@ class PipelineTest(unittest.TestCase):
{
'oom:flatten': 3 * num_elements,
('oom:combine/GroupByKey/reify_windows', None): 3 * num_elements,
- ('oom:dupes/oom:dupes', 'side'): num_elements,
- ('oom:dupes/oom:dupes', None): num_elements,
+ ('oom:dupes/FlatMap(create_dupes)', 'side'): num_elements,
+ ('oom:dupes/FlatMap(create_dupes)', None): num_elements,
'oom:create': num_elements,
('oom:addone', None): num_elements,
'oom:combine/GroupByKey/group_by_key': 1,
[09/12] incubator-beam git commit: Fix label-sensitive test.
Posted by ro...@apache.org.
Fix label-sensitive test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a59a121
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a59a121
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a59a121
Branch: refs/heads/python-sdk
Commit: 2a59a121441a003bad949ae6a23d58a9cf2b3059
Parents: 2ff5630
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 16:24:48 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 4 ++++
sdks/python/apache_beam/transforms/ptransform_test.py | 14 +++++++-------
2 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a59a121/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index aeed9f9..0572466 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -348,6 +348,10 @@ class AppliedPTransform(object):
# root producer.
self.refcounts = collections.defaultdict(int)
+ def __repr__(self):
+ return "%s(%s, %s)" % (self.__class__.__name__, self.full_label,
+ type(self.transform).__name__)
+
def update_input_refcounts(self):
"""Increment refcounts for all transforms providing inputs."""
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a59a121/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 8121c1e..3a71ec3 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -54,28 +54,28 @@ class PTransformTest(unittest.TestCase):
pa = Pipeline('DirectPipelineRunner')
res = pa | 'a_label' >> beam.Create([1, 2])
- self.assertEqual('<Create(PTransform) label=[a_label]>',
- str(res.producer.transform))
+ self.assertEqual('AppliedPTransform(a_label, Create)',
+ str(res.producer))
pc = Pipeline('DirectPipelineRunner')
- res = pc | 'with_inputs' >> beam.Create([1, 2])
+ res = pc | beam.Create([1, 2])
inputs_tr = res.producer.transform
inputs_tr.inputs = ('ci',)
self.assertEqual(
- """<Create(PTransform) label=[with_inputs] inputs=('ci',)>""",
+ """<Create(PTransform) label=[Create] inputs=('ci',)>""",
str(inputs_tr))
pd = Pipeline('DirectPipelineRunner')
- res = pd | 'with_sidei' >> beam.Create([1, 2])
+ res = pd | beam.Create([1, 2])
side_tr = res.producer.transform
side_tr.side_inputs = (4,)
self.assertEqual(
- '<Create(PTransform) label=[with_sidei] side_inputs=(4,)>',
+ '<Create(PTransform) label=[Create] side_inputs=(4,)>',
str(side_tr))
inputs_tr.side_inputs = ('cs',)
self.assertEqual(
- """<Create(PTransform) label=[with_inputs] """
+ """<Create(PTransform) label=[Create] """
"""inputs=('ci',) side_inputs=('cs',)>""",
str(inputs_tr))
[10/12] incubator-beam git commit: Lint fixes.
Posted by ro...@apache.org.
Lint fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b15d35ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b15d35ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b15d35ca
Branch: refs/heads/python-sdk
Commit: b15d35ca6e585e75153e05d96403336889cc6894
Parents: 2a59a12
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 18:35:22 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/dataflow_test.py | 7 +-
.../complete/juliaset/juliaset/juliaset.py | 9 +-
.../apache_beam/examples/complete/tfidf_test.py | 2 +-
.../examples/cookbook/bigquery_side_input.py | 14 +-
.../cookbook/bigquery_side_input_test.py | 4 +-
.../examples/cookbook/bigquery_tornadoes.py | 6 +-
.../apache_beam/examples/cookbook/bigshuffle.py | 5 +-
.../apache_beam/examples/cookbook/filters.py | 2 +-
.../apache_beam/examples/snippets/snippets.py | 2 +-
.../examples/snippets/snippets_test.py | 8 +-
sdks/python/apache_beam/examples/wordcount.py | 2 +-
.../apache_beam/examples/wordcount_debugging.py | 2 +-
.../apache_beam/examples/wordcount_minimal.py | 2 +-
sdks/python/apache_beam/io/bigquery.py | 4 +-
sdks/python/apache_beam/pipeline_test.py | 4 +-
.../apache_beam/transforms/combiners_test.py | 4 +-
sdks/python/apache_beam/transforms/core.py | 7 +-
.../apache_beam/transforms/ptransform_test.py | 161 ++++++++++---------
sdks/python/apache_beam/transforms/util.py | 3 +-
.../typehints/typed_pipeline_test.py | 2 +-
20 files changed, 127 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
index bf66851..cc3a526 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -114,8 +114,8 @@ class DataflowTest(unittest.TestCase):
words = pipeline | 'SomeWords' >> Create(words_list)
prefix = 'zyx'
suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in
- result = words | 'DecorateWordsDoFn' >> ParDo(SomeDoFn(), prefix,
- suffix=AsSingleton(suffix))
+ result = words | 'DecorateWordsDoFn' >> ParDo(
+ SomeDoFn(), prefix, suffix=AsSingleton(suffix))
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
pipeline.run()
@@ -179,8 +179,7 @@ class DataflowTest(unittest.TestCase):
pipeline = Pipeline('DirectPipelineRunner')
pcol = pipeline | 'start' >> Create([1, 2])
side = pipeline | 'side' >> Create([]) # 0 values in side input.
- result = (
- pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)))
+ result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))
assert_that(result, equal_to([10, 20]))
pipeline.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 56696c3..1445fbe 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -105,11 +105,12 @@ def run(argv=None): # pylint: disable=missing-docstring
# Group each coordinate triplet by its x value, then write the coordinates to
# the output file with an x-coordinate grouping per line.
# pylint: disable=expression-not-assigned
- (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
- | 'x coord' >> beam.GroupByKey() | beam.Map(
- 'format',
+ (coordinates
+ | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+ | 'x coord' >> beam.GroupByKey()
+ | 'format' >> beam.Map(
lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
- | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
+ | beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
# pylint: enable=expression-not-assigned
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index ee7e534..f30b832 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -56,7 +56,7 @@ class TfIdfTest(unittest.TestCase):
result = (
uri_to_line
| tfidf.TfIdf()
- | 'flatten' >> beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+ | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS))
# Run the pipeline. Note that the assert_that above adds to the pipeline
# a check that the result PCollection contains expected values. To actually
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index 1db4a1e..2099e48 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -101,11 +101,12 @@ def run(argv=None):
ignore_corpus = known_args.ignore_corpus
ignore_word = known_args.ignore_word
- pcoll_corpus = p | beam.Read('read corpus',
- beam.io.BigQuerySource(query=query_corpus))
- pcoll_word = p | beam.Read('read words',
- beam.io.BigQuerySource(query=query_word))
- pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create([ignore_corpus])
+ pcoll_corpus = p | 'read corpus' >> beam.io.Read(
+ beam.io.BigQuerySource(query=query_corpus))
+ pcoll_word = p | 'read_words' >> beam.Read(
+ beam.io.BigQuerySource(query=query_word))
+ pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create(
+ [ignore_corpus])
pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word])
pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids)
@@ -113,8 +114,7 @@ def run(argv=None):
pcoll_ignore_corpus, pcoll_ignore_word)
# pylint:disable=expression-not-assigned
- pcoll_groups | beam.io.Write('WriteToText',
- beam.io.TextFileSink(known_args.output))
+ pcoll_groups | beam.io.Write(beam.io.TextFileSink(known_args.output))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 215aafa..e2b20f3 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -35,8 +35,8 @@ class BigQuerySideInputTest(unittest.TestCase):
{'f': 'corpus2'},
{'f': 'corpus3'}])
words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'},
- {'f': 'word2'},
- {'f': 'word3'}])
+ {'f': 'word2'},
+ {'f': 'word3'}])
ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1'])
ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1'])
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index cdaee36..6e1326c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -53,11 +53,11 @@ def count_tornadoes(input_data):
"""
return (input_data
- | beam.FlatMap(
- 'months with tornadoes',
+ | 'months with tornatoes' >> beam.FlatMap(
lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
| 'monthly count' >> beam.CombinePerKey(sum)
- | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v}))
+ | 'format' >> beam.Map(
+ lambda (k, v): {'month': k, 'tornado_count': v}))
def run(argv=None):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index c29a038..f7070dc 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -59,8 +59,9 @@ def run(argv=None):
# Count the occurrences of each word.
output = (lines
- | 'split' >> beam.Map(lambda x: (x[:10], x[10:99])
- ).with_output_types(beam.typehints.KV[str, str])
+ | 'split' >> beam.Map(
+ lambda x: (x[:10], x[10:99]))
+ .with_output_types(beam.typehints.KV[str, str])
| 'group' >> beam.GroupByKey()
| beam.FlatMap(
'format',
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index b19b566..efd0ba7 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -88,7 +88,7 @@ def run(argv=None):
p = beam.Pipeline(argv=pipeline_args)
- input_data = p | 'input' >> beam.Read(beam.io.BigQuerySource(known_args.input))
+ input_data = p | beam.Read(beam.io.BigQuerySource(known_args.input))
# pylint: disable=expression-not-assigned
(filter_cold_days(input_data, known_args.month_filter)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 9d1df82..9f3d6e1 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -307,7 +307,7 @@ def pipeline_options_command_line(argv):
p = beam.Pipeline(argv=pipeline_args)
lines = p | beam.io.Read('ReadFromText',
beam.io.TextFileSource(known_args.input))
- lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+ lines | beam.io.Write(beam.io.TextFileSink(known_args.output))
# [END pipeline_options_command_line]
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 9eba46a..edc0a17 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -101,6 +101,7 @@ class ParDoTest(unittest.TestCase):
self.assertEqual({'A', 'C'}, set(all_capitals))
def test_pardo_with_label(self):
+ # pylint: disable=line-too-long
words = ['aa', 'bbc', 'defg']
# [START model_pardo_with_label]
result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word)))
@@ -127,10 +128,9 @@ class ParDoTest(unittest.TestCase):
small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
# A single deferred side input.
- larger_than_average = (words
- | 'large' >> beam.FlatMap(filter_using_length,
- lower_bound=pvalue.AsSingleton(
- avg_word_len)))
+ larger_than_average = (words | 'large' >> beam.FlatMap(
+ filter_using_length,
+ lower_bound=pvalue.AsSingleton(avg_word_len)))
# Mix and match.
small_but_nontrivial = words | beam.FlatMap(filter_using_length,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 4744352..096e508 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -82,7 +82,7 @@ def run(argv=None):
# Count the occurrences of each word.
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
- .with_output_types(unicode))
+ .with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index e008b48..473a486 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -98,7 +98,7 @@ class CountWords(beam.PTransform):
def apply(self, pcoll):
return (pcoll
| 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- .with_output_types(unicode))
+ .with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index ce5b644..4073f7b 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -98,7 +98,7 @@ def run(argv=None):
# Count the occurrences of each word.
counts = (lines
| 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- .with_output_types(unicode))
+ .with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index f789312..50c2eaf 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -45,8 +45,8 @@ Map transform will get on each call *one* row of the main table and *all* rows
of the side table. The execution framework may use some caching techniques to
share the side inputs between calls in order to avoid excessive reading::
- main_table = pipeline | 'very_big_table' >> beam.io.Read(beam.io.BigQuerySource()
- side_table = pipeline | 'not_big_table' >> beam.io.Read(beam.io.BigQuerySource()
+ main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource()
+ side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource()
results = (
main_table
| beam.Map('process data',
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 327f26c..8a0d246 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -217,8 +217,8 @@ class PipelineTest(unittest.TestCase):
dupes = (
biglist
| 'oom:addone' >> Map(lambda x: (x, 1))
- | 'oom:dupes' >> FlatMap(create_dupes,
- AsIter(biglist)).with_outputs('side', main='main'))
+ | 'oom:dupes' >> FlatMap(
+ create_dupes, AsIter(biglist)).with_outputs('side', main='main'))
result = (
(dupes.side, dupes.main, dupes.side)
| 'oom:flatten' >> Flatten()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index c970382..bfe168d 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -219,8 +219,8 @@ class CombineTest(unittest.TestCase):
return main | Map(lambda _, s: s, side)
p = Pipeline('DirectPipelineRunner')
- result1 = p | 'label1' >> Create([]) | 'L1' >> CombineWithSideInput()
- result2 = p | 'label2' >> Create([1, 2, 3, 4]) | 'L2' >> CombineWithSideInput()
+ result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput()
+ result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput()
assert_that(result1, equal_to([0]), label='r1')
assert_that(result2, equal_to([10]), label='r2')
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 44a6d29..5e6aafc 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -814,9 +814,9 @@ class CombineGlobally(PTransform):
return transform
combined = (pcoll
- | 'KeyWithVoid' >> add_input_types(Map(lambda v: (None, v))
- .with_output_types(
- KV[None, pcoll.element_type]))
+ | 'KeyWithVoid' >> add_input_types(
+ Map(lambda v: (None, v)).with_output_types(
+ KV[None, pcoll.element_type]))
| CombinePerKey(
'CombinePerKey', self.fn, *self.args, **self.kwargs)
| 'UnKey' >> Map(lambda (k, v): v))
@@ -1044,6 +1044,7 @@ class GroupByKey(PTransform):
KV[key_type, Iterable[typehints.WindowedValue[value_type]]])
gbk_output_type = KV[key_type, Iterable[value_type]]
+ # pylint: disable=bad-continuation
return (pcoll
| 'reify_windows' >> (ParDo(self.ReifyWindows())
.with_output_types(reify_output_type))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 3a71ec3..992f944 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -288,7 +288,7 @@ class PTransformTest(unittest.TestCase):
vals_2 = [2, 4, 6, 8, 10, 12, 14]
pipeline = Pipeline('DirectPipelineRunner')
pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
- [('b', x) for x in vals_2]))
+ [('b', x) for x in vals_2]))
result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
('b', sum(vals_2) / len(vals_2))]))
@@ -299,7 +299,7 @@ class PTransformTest(unittest.TestCase):
vals_2 = [2, 4, 6, 8, 10, 12, 14]
pipeline = Pipeline('DirectPipelineRunner')
pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
- [('b', x) for x in vals_2]))
+ [('b', x) for x in vals_2]))
result = pcoll | beam.CombinePerKey(sum)
assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))]))
pipeline.run()
@@ -309,7 +309,7 @@ class PTransformTest(unittest.TestCase):
vals_2 = [2, 4, 6, 8, 10, 12, 14]
pipeline = Pipeline('DirectPipelineRunner')
pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
- [('b', x) for x in vals_2]))
+ [('b', x) for x in vals_2]))
divisor = pipeline | 'divisor' >> beam.Create([2])
result = pcoll | beam.CombinePerKey(
lambda vals, d: max(v for v in vals if v % d == 0),
@@ -513,13 +513,14 @@ class PTransformTest(unittest.TestCase):
def test_apply_to_list(self):
self.assertItemsEqual(
[1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1))
- self.assertItemsEqual([1], [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2))
+ self.assertItemsEqual([1],
+ [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2))
self.assertItemsEqual([1, 2, 100, 3],
- ([1, 2, 3], [100]) | 'flat' >> beam.Flatten())
+ ([1, 2, 3], [100]) | beam.Flatten())
join_input = ([('k', 'a')],
[('k', 'b'), ('k', 'c')])
self.assertItemsEqual([('k', (['a'], ['b', 'c']))],
- join_input | 'join' >> beam.CoGroupByKey())
+ join_input | beam.CoGroupByKey())
def test_multi_input_ptransform(self):
class DisjointUnion(PTransform):
@@ -776,9 +777,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
- | 'score' >> (beam.FlatMap(lambda x: [1] if x == 't' else [2])
+ | ('score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2])
.with_input_types(str).with_output_types(int))
- | 'upper' >> (beam.FlatMap(lambda x: [x.upper()])
+ | ('upper' >> beam.FlatMap(lambda x: [x.upper()])
.with_input_types(str).with_output_types(str)))
self.assertEqual("Type hint violation for 'upper': "
@@ -866,7 +867,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_filter_type_checks_using_type_hints_method(self):
# No error should be raised if this type-checks properly.
d = (self.p
- | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
+ | beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
| 'to int' >> beam.Map(lambda x: int(x))
.with_input_types(str).with_output_types(int)
| 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
@@ -904,9 +905,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_group_by_key_only_output_type_deduction(self):
d = (self.p
| 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
- | 'pair' >> (beam.Map(lambda x: (x, ord(x)))
+ | ('pair' >> beam.Map(lambda x: (x, ord(x)))
.with_output_types(typehints.KV[str, str]))
- | 'O' >> beam.GroupByKeyOnly())
+ | beam.GroupByKeyOnly())
# Output type should correctly be deduced.
# GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -916,9 +917,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_group_by_key_output_type_deduction(self):
d = (self.p
| 'str' >> beam.Create(range(20)).with_output_types(int)
- | 'pair negative' >> (beam.Map(lambda x: (x % 5, -x))
+ | ('pair negative' >> beam.Map(lambda x: (x % 5, -x))
.with_output_types(typehints.KV[int, int]))
- | 'T' >> beam.GroupByKey())
+ | beam.GroupByKey())
# Output type should correctly be deduced.
# GBK should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -929,8 +930,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# GBK will be passed raw int's here instead of some form of KV[A, B].
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 's' >> beam.Create([1, 2, 3]).with_output_types(int)
- | 'F' >> beam.GroupByKeyOnly())
+ | beam.Create([1, 2, 3]).with_output_types(int)
+ | beam.GroupByKeyOnly())
self.assertEqual("Input type hint violation at F: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -942,9 +943,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# aliased to Tuple[int, str].
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 's' >> (beam.Create(range(5))
+ | (beam.Create(range(5))
.with_output_types(typehints.Iterable[int]))
- | 'T' >> beam.GroupByKey())
+ | beam.GroupByKey())
self.assertEqual("Input type hint violation at T: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -973,7 +974,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
(self.p
| 'nums' >> beam.Create(range(5)).with_output_types(int)
| 'mod dup' >> beam.Map(lambda x: (x % 2, x))
- | 'G' >> beam.GroupByKeyOnly())
+ | beam.GroupByKeyOnly())
self.assertEqual('Pipeline type checking is enabled, however no output '
'type-hint was found for the PTransform '
@@ -1092,10 +1093,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# ParDo should receive an instance of type 'str', however an 'int' will be
# passed instead.
with self.assertRaises(typehints.TypeCheckError) as e:
- (self.p | 'n' >> beam.Create([1, 2, 3])
- | 'to int' >> (beam.FlatMap(lambda x: [int(x)])
- .with_input_types(str).with_output_types(int))
- )
+ (self.p
+ | beam.Create([1, 2, 3])
+ | ('to int' >> beam.FlatMap(lambda x: [int(x)])
+ .with_input_types(str).with_output_types(int)))
self.p.run()
self.assertStartswith(
@@ -1111,8 +1112,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
- | 'add' >> (beam.FlatMap(lambda (x, y): [x + y])
+ | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+ | ('add' >> beam.FlatMap(lambda (x, y): [x + y])
.with_input_types(typehints.Tuple[int, int]).with_output_types(int))
)
self.p.run()
@@ -1136,8 +1137,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
lambda x: [float(x)]).with_input_types(int).with_output_types(
int).get_type_hints()
with self.assertRaises(typehints.TypeCheckError) as e:
- (self.p | 'n' >> beam.Create([1, 2, 3])
- | 'to int' >> (beam.FlatMap(lambda x: [float(x)])
+ (self.p
+ | beam.Create([1, 2, 3])
+ | ('to int' >> beam.FlatMap(lambda x: [float(x)])
.with_input_types(int).with_output_types(int))
)
self.p.run()
@@ -1159,8 +1161,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# of 'int' will be generated instead.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
- | 'swap' >> (beam.FlatMap(lambda (x, y): [x + y])
+ | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+ | ('swap' >> beam.FlatMap(lambda (x, y): [x + y])
.with_input_types(typehints.Tuple[int, float])
.with_output_types(typehints.Tuple[float, int]))
)
@@ -1183,7 +1185,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return a + b
with self.assertRaises(typehints.TypeCheckError) as e:
- (self.p | 't' >> beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0))
+ (self.p | beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0))
self.p.run()
self.assertStartswith(
@@ -1199,8 +1201,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 't' >> beam.Create([1, 2, 3, 4])
- | 'add 1' >> (beam.Map(lambda x, one: x + one, 1.0)
+ | beam.Create([1, 2, 3, 4])
+ | ('add 1' >> beam.Map(lambda x, one: x + one, 1.0)
.with_input_types(int, int)
.with_output_types(float)))
self.p.run()
@@ -1305,8 +1307,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_combine_pipeline_type_check_using_methods(self):
d = (self.p
- | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
- | 'concat' >> (beam.CombineGlobally(lambda s: ''.join(s))
+ | beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | ('concat' >> beam.CombineGlobally(lambda s: ''.join(s))
.with_input_types(str).with_output_types(str)))
def matcher(expected):
@@ -1321,8 +1323,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 's' >> beam.Create(range(5)).with_output_types(int)
- | 'sum' >> (beam.CombineGlobally(lambda s: sum(s))
+ | beam.Create(range(5)).with_output_types(int)
+ | ('sum' >> beam.CombineGlobally(lambda s: sum(s))
.with_input_types(int).with_output_types(int)))
assert_that(d, equal_to([10]))
@@ -1331,8 +1333,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_combine_pipeline_type_check_violation_using_methods(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'e' >> beam.Create(range(3)).with_output_types(int)
- | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+ | beam.Create(range(3)).with_output_types(int)
+ | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
.with_input_types(str).with_output_types(str)))
self.assertEqual("Input type hint violation at sort join: "
@@ -1345,8 +1347,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'e' >> beam.Create(range(3)).with_output_types(int)
- | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+ | beam.Create(range(3)).with_output_types(int)
+ | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
.with_input_types(str).with_output_types(str)))
self.p.run()
@@ -1427,8 +1429,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_mean_per_key_pipeline_checking_satisfied(self):
d = (self.p
- | 'c' >> beam.Create(range(5)).with_output_types(int)
- | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
+ | beam.Create(range(5)).with_output_types(int)
+ | ('even group' >> beam.Map(lambda x: (not x % 2, x))
.with_output_types(typehints.KV[bool, int]))
| 'even mean' >> combine.Mean.PerKey())
@@ -1439,8 +1441,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_mean_per_key_pipeline_checking_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'e' >> beam.Create(map(str, range(5))).with_output_types(str)
- | 'upper pair' >> (beam.Map(lambda x: (x.upper(), x))
+ | beam.Create(map(str, range(5))).with_output_types(str)
+ | ('upper pair' >> beam.Map(lambda x: (x.upper(), x))
.with_output_types(typehints.KV[str, str]))
| 'even mean' >> combine.Mean.PerKey())
self.p.run()
@@ -1455,8 +1457,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'c' >> beam.Create(range(5)).with_output_types(int)
- | 'odd group' >> (beam.Map(lambda x: (bool(x % 2), x))
+ | beam.Create(range(5)).with_output_types(int)
+ | ('odd group' >> beam.Map(lambda x: (bool(x % 2), x))
.with_output_types(typehints.KV[bool, int]))
| 'odd mean' >> combine.Mean.PerKey())
@@ -1470,8 +1472,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'c' >> beam.Create(range(5)).with_output_types(int)
- | 'odd group' >> (beam.Map(lambda x: (x, str(bool(x % 2))))
+ | beam.Create(range(5)).with_output_types(int)
+ | ('odd group' >> beam.Map(lambda x: (x, str(bool(x % 2))))
.with_output_types(typehints.KV[int, str]))
| 'odd mean' >> combine.Mean.PerKey())
self.p.run()
@@ -1514,8 +1516,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_perkey_pipeline_type_checking_satisfied(self):
d = (self.p
- | 'p' >> beam.Create(range(5)).with_output_types(int)
- | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
+ | beam.Create(range(5)).with_output_types(int)
+ | ('even group' >> beam.Map(lambda x: (not x % 2, x))
.with_output_types(typehints.KV[bool, int]))
| 'count int' >> combine.Count.PerKey())
@@ -1526,7 +1528,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_perkey_pipeline_type_checking_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'p' >> beam.Create(range(5)).with_output_types(int)
+ | beam.Create(range(5)).with_output_types(int)
| 'count int' >> combine.Count.PerKey())
self.assertEqual("Input type hint violation at GroupByKey: "
@@ -1538,7 +1540,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | beam.Create(['t', 'e', 's', 't']).with_output_types(str)
| 'dup key' >> beam.Map(lambda x: (x, x))
.with_output_types(typehints.KV[str, str])
| 'count dups' >> combine.Count.PerKey())
@@ -1549,7 +1551,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_perelement_pipeline_type_checking_satisfied(self):
d = (self.p
- | 'w' >> beam.Create([1, 1, 2, 3]).with_output_types(int)
+ | beam.Create([1, 1, 2, 3]).with_output_types(int)
| 'count elems' >> combine.Count.PerElement())
self.assertCompatible(typehints.KV[int, int], d.element_type)
@@ -1561,7 +1563,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'f' >> beam.Create([1, 1, 2, 3])
+ | beam.Create([1, 1, 2, 3])
| 'count elems' >> combine.Count.PerElement())
self.assertEqual('Pipeline type checking is enabled, however no output '
@@ -1573,7 +1575,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'w' >> beam.Create([True, True, False, True, True])
+ | beam.Create([True, True, False, True, True])
.with_output_types(bool)
| 'count elems' >> combine.Count.PerElement())
@@ -1583,7 +1585,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_top_of_pipeline_checking_satisfied(self):
d = (self.p
- | 'n' >> beam.Create(range(5, 11)).with_output_types(int)
+ | beam.Create(range(5, 11)).with_output_types(int)
| 'top 3' >> combine.Top.Of(3, lambda x, y: x < y))
self.assertCompatible(typehints.Iterable[int],
@@ -1595,7 +1597,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'n' >> beam.Create(list('testing')).with_output_types(str)
+ | beam.Create(list('testing')).with_output_types(str)
| 'acii top' >> combine.Top.Of(3, lambda x, y: x < y))
self.assertCompatible(typehints.Iterable[str], d.element_type)
@@ -1605,7 +1607,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_per_key_pipeline_checking_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'n' >> beam.Create(range(100)).with_output_types(int)
+ | beam.Create(range(100)).with_output_types(int)
| 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
| 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
@@ -1616,8 +1618,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_per_key_pipeline_checking_satisfied(self):
d = (self.p
- | 'n' >> beam.Create(range(100)).with_output_types(int)
- | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
+ | beam.Create(range(100)).with_output_types(int)
+ | ('group mod 3' >> beam.Map(lambda x: (x % 3, x))
.with_output_types(typehints.KV[int, int]))
| 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
@@ -1630,8 +1632,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'n' >> beam.Create(range(21))
- | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
+ | beam.Create(range(21))
+ | ('group mod 3' >> beam.Map(lambda x: (x % 3, x))
.with_output_types(typehints.KV[int, int]))
| 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
@@ -1642,7 +1644,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_sample_globally_pipeline_satisfied(self):
d = (self.p
- | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+ | beam.Create([2, 2, 3, 3]).with_output_types(int)
| 'sample' >> combine.Sample.FixedSizeGlobally(3))
self.assertCompatible(typehints.Iterable[int], d.element_type)
@@ -1658,7 +1660,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+ | beam.Create([2, 2, 3, 3]).with_output_types(int)
| 'sample' >> combine.Sample.FixedSizeGlobally(2))
self.assertCompatible(typehints.Iterable[int], d.element_type)
@@ -1672,7 +1674,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_sample_per_key_pipeline_satisfied(self):
d = (self.p
- | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
+ | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
.with_output_types(typehints.KV[int, int]))
| 'sample' >> combine.Sample.FixedSizePerKey(2))
@@ -1691,7 +1693,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
+ | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
.with_output_types(typehints.KV[int, int]))
| 'sample' >> combine.Sample.FixedSizePerKey(1))
@@ -1708,8 +1710,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_to_list_pipeline_check_satisfied(self):
d = (self.p
- | 'c' >> beam.Create((1, 2, 3, 4)).with_output_types(int)
- | 'to list' >> combine.ToList())
+ | beam.Create((1, 2, 3, 4)).with_output_types(int)
+ | combine.ToList())
self.assertCompatible(typehints.List[int], d.element_type)
@@ -1724,8 +1726,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'c' >> beam.Create(list('test')).with_output_types(str)
- | 'to list' >> combine.ToList())
+ | beam.Create(list('test')).with_output_types(str)
+ | combine.ToList())
self.assertCompatible(typehints.List[str], d.element_type)
@@ -1739,8 +1741,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_to_dict_pipeline_check_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'd' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
- | 'to dict' >> combine.ToDict())
+ | beam.Create([1, 2, 3, 4]).with_output_types(int)
+ | combine.ToDict())
self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
"requires Tuple[TypeVariable[K], "
@@ -1751,9 +1753,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_to_dict_pipeline_check_satisfied(self):
d = (self.p
| beam.Create(
- 'd',
[(1, 2), (3, 4)]).with_output_types(typehints.Tuple[int, int])
- | 'to dict' >> combine.ToDict())
+ | combine.ToDict())
self.assertCompatible(typehints.Dict[int, int], d.element_type)
assert_that(d, equal_to([{1: 2, 3: 4}]))
@@ -1763,9 +1764,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'd' >> (beam.Create([('1', 2), ('3', 4)])
+ | (beam.Create([('1', 2), ('3', 4)])
.with_output_types(typehints.Tuple[str, int]))
- | 'to dict' >> combine.ToDict())
+ | combine.ToDict())
self.assertCompatible(typehints.Dict[str, int], d.element_type)
assert_that(d, equal_to([{'1': 2, '3': 4}]))
@@ -1776,7 +1777,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(TypeError) as e:
(self.p
- | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | beam.Create([1, 2, 3]).with_output_types(int)
| 'len' >> beam.Map(lambda x: len(x)).with_output_types(int))
self.p.run()
@@ -1799,7 +1800,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
beam.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
def test_pipeline_inference(self):
- created = self.p | 'c' >> beam.Create(['a', 'b', 'c'])
+ created = self.p | beam.Create(['a', 'b', 'c'])
mapped = created | 'pair with 1' >> beam.Map(lambda x: (x, 1))
grouped = mapped | beam.GroupByKey()
self.assertEqual(str, created.element_type)
@@ -1810,7 +1811,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_inferred_bad_kv_type(self):
with self.assertRaises(typehints.TypeCheckError) as e:
_ = (self.p
- | 't' >> beam.Create(['a', 'b', 'c'])
+ | beam.Create(['a', 'b', 'c'])
| 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0))
| beam.GroupByKey())
@@ -1821,11 +1822,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_type_inference_command_line_flag_toggle(self):
self.p.options.view_as(TypeOptions).pipeline_type_check = False
- x = self.p | 't' >> beam.Create([1, 2, 3, 4])
+ x = self.p | 'c1' >> beam.Create([1, 2, 3, 4])
self.assertIsNone(x.element_type)
self.p.options.view_as(TypeOptions).pipeline_type_check = True
- x = self.p | 'm' >> beam.Create([1, 2, 3, 4])
+ x = self.p | 'c2' >> beam.Create([1, 2, 3, 4])
self.assertEqual(int, x.element_type)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index bbb7787..4564cf9 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -220,7 +220,8 @@ def assert_that(actual, matcher, label='assert_that'):
class AssertThat(PTransform):
def apply(self, pipeline):
- return pipeline | 'singleton' >> Create([None]) | Map(match, AllOf(actual))
+ return pipeline | 'singleton' >> Create([None]) | Map(match,
+ AllOf(actual))
def default_label(self):
return label
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 4e1ab68..f2e8f12 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -177,7 +177,7 @@ class SideInputTest(unittest.TestCase):
bad_side_input = p | 'bad_side' >> beam.Create(['z'])
with self.assertRaises(typehints.TypeCheckError):
- main_input | 'again' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input))
+ main_input | 'bis' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input))
def test_deferred_side_input_iterable(self):
@typehints.with_input_types(str, typehints.Iterable[str])