You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/23 23:47:16 UTC
[10/12] incubator-beam git commit: Lint fixes.
Lint fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b15d35ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b15d35ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b15d35ca
Branch: refs/heads/python-sdk
Commit: b15d35ca6e585e75153e05d96403336889cc6894
Parents: 2a59a12
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 18:35:22 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/dataflow_test.py | 7 +-
.../complete/juliaset/juliaset/juliaset.py | 9 +-
.../apache_beam/examples/complete/tfidf_test.py | 2 +-
.../examples/cookbook/bigquery_side_input.py | 14 +-
.../cookbook/bigquery_side_input_test.py | 4 +-
.../examples/cookbook/bigquery_tornadoes.py | 6 +-
.../apache_beam/examples/cookbook/bigshuffle.py | 5 +-
.../apache_beam/examples/cookbook/filters.py | 2 +-
.../apache_beam/examples/snippets/snippets.py | 2 +-
.../examples/snippets/snippets_test.py | 8 +-
sdks/python/apache_beam/examples/wordcount.py | 2 +-
.../apache_beam/examples/wordcount_debugging.py | 2 +-
.../apache_beam/examples/wordcount_minimal.py | 2 +-
sdks/python/apache_beam/io/bigquery.py | 4 +-
sdks/python/apache_beam/pipeline_test.py | 4 +-
.../apache_beam/transforms/combiners_test.py | 4 +-
sdks/python/apache_beam/transforms/core.py | 7 +-
.../apache_beam/transforms/ptransform_test.py | 161 ++++++++++---------
sdks/python/apache_beam/transforms/util.py | 3 +-
.../typehints/typed_pipeline_test.py | 2 +-
20 files changed, 127 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
index bf66851..cc3a526 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -114,8 +114,8 @@ class DataflowTest(unittest.TestCase):
words = pipeline | 'SomeWords' >> Create(words_list)
prefix = 'zyx'
suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in
- result = words | 'DecorateWordsDoFn' >> ParDo(SomeDoFn(), prefix,
- suffix=AsSingleton(suffix))
+ result = words | 'DecorateWordsDoFn' >> ParDo(
+ SomeDoFn(), prefix, suffix=AsSingleton(suffix))
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
pipeline.run()
@@ -179,8 +179,7 @@ class DataflowTest(unittest.TestCase):
pipeline = Pipeline('DirectPipelineRunner')
pcol = pipeline | 'start' >> Create([1, 2])
side = pipeline | 'side' >> Create([]) # 0 values in side input.
- result = (
- pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)))
+ result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))
assert_that(result, equal_to([10, 20]))
pipeline.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 56696c3..1445fbe 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -105,11 +105,12 @@ def run(argv=None): # pylint: disable=missing-docstring
# Group each coordinate triplet by its x value, then write the coordinates to
# the output file with an x-coordinate grouping per line.
# pylint: disable=expression-not-assigned
- (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
- | 'x coord' >> beam.GroupByKey() | beam.Map(
- 'format',
+ (coordinates
+ | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+ | 'x coord' >> beam.GroupByKey()
+ | 'format' >> beam.Map(
lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
- | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
+ | beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
# pylint: enable=expression-not-assigned
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index ee7e534..f30b832 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -56,7 +56,7 @@ class TfIdfTest(unittest.TestCase):
result = (
uri_to_line
| tfidf.TfIdf()
- | 'flatten' >> beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+ | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS))
# Run the pipeline. Note that the assert_that above adds to the pipeline
# a check that the result PCollection contains expected values. To actually
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index 1db4a1e..2099e48 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -101,11 +101,12 @@ def run(argv=None):
ignore_corpus = known_args.ignore_corpus
ignore_word = known_args.ignore_word
- pcoll_corpus = p | beam.Read('read corpus',
- beam.io.BigQuerySource(query=query_corpus))
- pcoll_word = p | beam.Read('read words',
- beam.io.BigQuerySource(query=query_word))
- pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create([ignore_corpus])
+ pcoll_corpus = p | 'read corpus' >> beam.io.Read(
+ beam.io.BigQuerySource(query=query_corpus))
+ pcoll_word = p | 'read_words' >> beam.Read(
+ beam.io.BigQuerySource(query=query_word))
+ pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create(
+ [ignore_corpus])
pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word])
pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids)
@@ -113,8 +114,7 @@ def run(argv=None):
pcoll_ignore_corpus, pcoll_ignore_word)
# pylint:disable=expression-not-assigned
- pcoll_groups | beam.io.Write('WriteToText',
- beam.io.TextFileSink(known_args.output))
+ pcoll_groups | beam.io.Write(beam.io.TextFileSink(known_args.output))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 215aafa..e2b20f3 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -35,8 +35,8 @@ class BigQuerySideInputTest(unittest.TestCase):
{'f': 'corpus2'},
{'f': 'corpus3'}])
words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'},
- {'f': 'word2'},
- {'f': 'word3'}])
+ {'f': 'word2'},
+ {'f': 'word3'}])
ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1'])
ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1'])
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index cdaee36..6e1326c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -53,11 +53,11 @@ def count_tornadoes(input_data):
"""
return (input_data
- | beam.FlatMap(
- 'months with tornadoes',
+ | 'months with tornatoes' >> beam.FlatMap(
lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
| 'monthly count' >> beam.CombinePerKey(sum)
- | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v}))
+ | 'format' >> beam.Map(
+ lambda (k, v): {'month': k, 'tornado_count': v}))
def run(argv=None):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index c29a038..f7070dc 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -59,8 +59,9 @@ def run(argv=None):
# Count the occurrences of each word.
output = (lines
- | 'split' >> beam.Map(lambda x: (x[:10], x[10:99])
- ).with_output_types(beam.typehints.KV[str, str])
+ | 'split' >> beam.Map(
+ lambda x: (x[:10], x[10:99]))
+ .with_output_types(beam.typehints.KV[str, str])
| 'group' >> beam.GroupByKey()
| beam.FlatMap(
'format',
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index b19b566..efd0ba7 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -88,7 +88,7 @@ def run(argv=None):
p = beam.Pipeline(argv=pipeline_args)
- input_data = p | 'input' >> beam.Read(beam.io.BigQuerySource(known_args.input))
+ input_data = p | beam.Read(beam.io.BigQuerySource(known_args.input))
# pylint: disable=expression-not-assigned
(filter_cold_days(input_data, known_args.month_filter)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 9d1df82..9f3d6e1 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -307,7 +307,7 @@ def pipeline_options_command_line(argv):
p = beam.Pipeline(argv=pipeline_args)
lines = p | beam.io.Read('ReadFromText',
beam.io.TextFileSource(known_args.input))
- lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+ lines | beam.io.Write(beam.io.TextFileSink(known_args.output))
# [END pipeline_options_command_line]
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 9eba46a..edc0a17 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -101,6 +101,7 @@ class ParDoTest(unittest.TestCase):
self.assertEqual({'A', 'C'}, set(all_capitals))
def test_pardo_with_label(self):
+ # pylint: disable=line-too-long
words = ['aa', 'bbc', 'defg']
# [START model_pardo_with_label]
result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word)))
@@ -127,10 +128,9 @@ class ParDoTest(unittest.TestCase):
small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
# A single deferred side input.
- larger_than_average = (words
- | 'large' >> beam.FlatMap(filter_using_length,
- lower_bound=pvalue.AsSingleton(
- avg_word_len)))
+ larger_than_average = (words | 'large' >> beam.FlatMap(
+ filter_using_length,
+ lower_bound=pvalue.AsSingleton(avg_word_len)))
# Mix and match.
small_but_nontrivial = words | beam.FlatMap(filter_using_length,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 4744352..096e508 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -82,7 +82,7 @@ def run(argv=None):
# Count the occurrences of each word.
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
- .with_output_types(unicode))
+ .with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index e008b48..473a486 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -98,7 +98,7 @@ class CountWords(beam.PTransform):
def apply(self, pcoll):
return (pcoll
| 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- .with_output_types(unicode))
+ .with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index ce5b644..4073f7b 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -98,7 +98,7 @@ def run(argv=None):
# Count the occurrences of each word.
counts = (lines
| 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- .with_output_types(unicode))
+ .with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index f789312..50c2eaf 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -45,8 +45,8 @@ Map transform will get on each call *one* row of the main table and *all* rows
of the side table. The execution framework may use some caching techniques to
share the side inputs between calls in order to avoid excessive reading::
- main_table = pipeline | 'very_big_table' >> beam.io.Read(beam.io.BigQuerySource()
- side_table = pipeline | 'not_big_table' >> beam.io.Read(beam.io.BigQuerySource()
+ main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource()
+ side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource()
results = (
main_table
| beam.Map('process data',
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 327f26c..8a0d246 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -217,8 +217,8 @@ class PipelineTest(unittest.TestCase):
dupes = (
biglist
| 'oom:addone' >> Map(lambda x: (x, 1))
- | 'oom:dupes' >> FlatMap(create_dupes,
- AsIter(biglist)).with_outputs('side', main='main'))
+ | 'oom:dupes' >> FlatMap(
+ create_dupes, AsIter(biglist)).with_outputs('side', main='main'))
result = (
(dupes.side, dupes.main, dupes.side)
| 'oom:flatten' >> Flatten()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index c970382..bfe168d 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -219,8 +219,8 @@ class CombineTest(unittest.TestCase):
return main | Map(lambda _, s: s, side)
p = Pipeline('DirectPipelineRunner')
- result1 = p | 'label1' >> Create([]) | 'L1' >> CombineWithSideInput()
- result2 = p | 'label2' >> Create([1, 2, 3, 4]) | 'L2' >> CombineWithSideInput()
+ result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput()
+ result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput()
assert_that(result1, equal_to([0]), label='r1')
assert_that(result2, equal_to([10]), label='r2')
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 44a6d29..5e6aafc 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -814,9 +814,9 @@ class CombineGlobally(PTransform):
return transform
combined = (pcoll
- | 'KeyWithVoid' >> add_input_types(Map(lambda v: (None, v))
- .with_output_types(
- KV[None, pcoll.element_type]))
+ | 'KeyWithVoid' >> add_input_types(
+ Map(lambda v: (None, v)).with_output_types(
+ KV[None, pcoll.element_type]))
| CombinePerKey(
'CombinePerKey', self.fn, *self.args, **self.kwargs)
| 'UnKey' >> Map(lambda (k, v): v))
@@ -1044,6 +1044,7 @@ class GroupByKey(PTransform):
KV[key_type, Iterable[typehints.WindowedValue[value_type]]])
gbk_output_type = KV[key_type, Iterable[value_type]]
+ # pylint: disable=bad-continuation
return (pcoll
| 'reify_windows' >> (ParDo(self.ReifyWindows())
.with_output_types(reify_output_type))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 3a71ec3..992f944 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -288,7 +288,7 @@ class PTransformTest(unittest.TestCase):
vals_2 = [2, 4, 6, 8, 10, 12, 14]
pipeline = Pipeline('DirectPipelineRunner')
pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
- [('b', x) for x in vals_2]))
+ [('b', x) for x in vals_2]))
result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
('b', sum(vals_2) / len(vals_2))]))
@@ -299,7 +299,7 @@ class PTransformTest(unittest.TestCase):
vals_2 = [2, 4, 6, 8, 10, 12, 14]
pipeline = Pipeline('DirectPipelineRunner')
pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
- [('b', x) for x in vals_2]))
+ [('b', x) for x in vals_2]))
result = pcoll | beam.CombinePerKey(sum)
assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))]))
pipeline.run()
@@ -309,7 +309,7 @@ class PTransformTest(unittest.TestCase):
vals_2 = [2, 4, 6, 8, 10, 12, 14]
pipeline = Pipeline('DirectPipelineRunner')
pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
- [('b', x) for x in vals_2]))
+ [('b', x) for x in vals_2]))
divisor = pipeline | 'divisor' >> beam.Create([2])
result = pcoll | beam.CombinePerKey(
lambda vals, d: max(v for v in vals if v % d == 0),
@@ -513,13 +513,14 @@ class PTransformTest(unittest.TestCase):
def test_apply_to_list(self):
self.assertItemsEqual(
[1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1))
- self.assertItemsEqual([1], [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2))
+ self.assertItemsEqual([1],
+ [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2))
self.assertItemsEqual([1, 2, 100, 3],
- ([1, 2, 3], [100]) | 'flat' >> beam.Flatten())
+ ([1, 2, 3], [100]) | beam.Flatten())
join_input = ([('k', 'a')],
[('k', 'b'), ('k', 'c')])
self.assertItemsEqual([('k', (['a'], ['b', 'c']))],
- join_input | 'join' >> beam.CoGroupByKey())
+ join_input | beam.CoGroupByKey())
def test_multi_input_ptransform(self):
class DisjointUnion(PTransform):
@@ -776,9 +777,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
- | 'score' >> (beam.FlatMap(lambda x: [1] if x == 't' else [2])
+ | ('score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2])
.with_input_types(str).with_output_types(int))
- | 'upper' >> (beam.FlatMap(lambda x: [x.upper()])
+ | ('upper' >> beam.FlatMap(lambda x: [x.upper()])
.with_input_types(str).with_output_types(str)))
self.assertEqual("Type hint violation for 'upper': "
@@ -866,7 +867,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_filter_type_checks_using_type_hints_method(self):
# No error should be raised if this type-checks properly.
d = (self.p
- | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
+ | beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
| 'to int' >> beam.Map(lambda x: int(x))
.with_input_types(str).with_output_types(int)
| 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
@@ -904,9 +905,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_group_by_key_only_output_type_deduction(self):
d = (self.p
| 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
- | 'pair' >> (beam.Map(lambda x: (x, ord(x)))
+ | ('pair' >> beam.Map(lambda x: (x, ord(x)))
.with_output_types(typehints.KV[str, str]))
- | 'O' >> beam.GroupByKeyOnly())
+ | beam.GroupByKeyOnly())
# Output type should correctly be deduced.
# GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -916,9 +917,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_group_by_key_output_type_deduction(self):
d = (self.p
| 'str' >> beam.Create(range(20)).with_output_types(int)
- | 'pair negative' >> (beam.Map(lambda x: (x % 5, -x))
+ | ('pair negative' >> beam.Map(lambda x: (x % 5, -x))
.with_output_types(typehints.KV[int, int]))
- | 'T' >> beam.GroupByKey())
+ | beam.GroupByKey())
# Output type should correctly be deduced.
# GBK should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -929,8 +930,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# GBK will be passed raw int's here instead of some form of KV[A, B].
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 's' >> beam.Create([1, 2, 3]).with_output_types(int)
- | 'F' >> beam.GroupByKeyOnly())
+ | beam.Create([1, 2, 3]).with_output_types(int)
+ | beam.GroupByKeyOnly())
self.assertEqual("Input type hint violation at F: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -942,9 +943,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# aliased to Tuple[int, str].
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 's' >> (beam.Create(range(5))
+ | (beam.Create(range(5))
.with_output_types(typehints.Iterable[int]))
- | 'T' >> beam.GroupByKey())
+ | beam.GroupByKey())
self.assertEqual("Input type hint violation at T: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -973,7 +974,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
(self.p
| 'nums' >> beam.Create(range(5)).with_output_types(int)
| 'mod dup' >> beam.Map(lambda x: (x % 2, x))
- | 'G' >> beam.GroupByKeyOnly())
+ | beam.GroupByKeyOnly())
self.assertEqual('Pipeline type checking is enabled, however no output '
'type-hint was found for the PTransform '
@@ -1092,10 +1093,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# ParDo should receive an instance of type 'str', however an 'int' will be
# passed instead.
with self.assertRaises(typehints.TypeCheckError) as e:
- (self.p | 'n' >> beam.Create([1, 2, 3])
- | 'to int' >> (beam.FlatMap(lambda x: [int(x)])
- .with_input_types(str).with_output_types(int))
- )
+ (self.p
+ | beam.Create([1, 2, 3])
+ | ('to int' >> beam.FlatMap(lambda x: [int(x)])
+ .with_input_types(str).with_output_types(int)))
self.p.run()
self.assertStartswith(
@@ -1111,8 +1112,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
- | 'add' >> (beam.FlatMap(lambda (x, y): [x + y])
+ | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+ | ('add' >> beam.FlatMap(lambda (x, y): [x + y])
.with_input_types(typehints.Tuple[int, int]).with_output_types(int))
)
self.p.run()
@@ -1136,8 +1137,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
lambda x: [float(x)]).with_input_types(int).with_output_types(
int).get_type_hints()
with self.assertRaises(typehints.TypeCheckError) as e:
- (self.p | 'n' >> beam.Create([1, 2, 3])
- | 'to int' >> (beam.FlatMap(lambda x: [float(x)])
+ (self.p
+ | beam.Create([1, 2, 3])
+ | ('to int' >> beam.FlatMap(lambda x: [float(x)])
.with_input_types(int).with_output_types(int))
)
self.p.run()
@@ -1159,8 +1161,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# of 'int' will be generated instead.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
- | 'swap' >> (beam.FlatMap(lambda (x, y): [x + y])
+ | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+ | ('swap' >> beam.FlatMap(lambda (x, y): [x + y])
.with_input_types(typehints.Tuple[int, float])
.with_output_types(typehints.Tuple[float, int]))
)
@@ -1183,7 +1185,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return a + b
with self.assertRaises(typehints.TypeCheckError) as e:
- (self.p | 't' >> beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0))
+ (self.p | beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0))
self.p.run()
self.assertStartswith(
@@ -1199,8 +1201,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 't' >> beam.Create([1, 2, 3, 4])
- | 'add 1' >> (beam.Map(lambda x, one: x + one, 1.0)
+ | beam.Create([1, 2, 3, 4])
+ | ('add 1' >> beam.Map(lambda x, one: x + one, 1.0)
.with_input_types(int, int)
.with_output_types(float)))
self.p.run()
@@ -1305,8 +1307,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_combine_pipeline_type_check_using_methods(self):
d = (self.p
- | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
- | 'concat' >> (beam.CombineGlobally(lambda s: ''.join(s))
+ | beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | ('concat' >> beam.CombineGlobally(lambda s: ''.join(s))
.with_input_types(str).with_output_types(str)))
def matcher(expected):
@@ -1321,8 +1323,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 's' >> beam.Create(range(5)).with_output_types(int)
- | 'sum' >> (beam.CombineGlobally(lambda s: sum(s))
+ | beam.Create(range(5)).with_output_types(int)
+ | ('sum' >> beam.CombineGlobally(lambda s: sum(s))
.with_input_types(int).with_output_types(int)))
assert_that(d, equal_to([10]))
@@ -1331,8 +1333,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_combine_pipeline_type_check_violation_using_methods(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'e' >> beam.Create(range(3)).with_output_types(int)
- | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+ | beam.Create(range(3)).with_output_types(int)
+ | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
.with_input_types(str).with_output_types(str)))
self.assertEqual("Input type hint violation at sort join: "
@@ -1345,8 +1347,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'e' >> beam.Create(range(3)).with_output_types(int)
- | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+ | beam.Create(range(3)).with_output_types(int)
+ | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
.with_input_types(str).with_output_types(str)))
self.p.run()
@@ -1427,8 +1429,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_mean_per_key_pipeline_checking_satisfied(self):
d = (self.p
- | 'c' >> beam.Create(range(5)).with_output_types(int)
- | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
+ | beam.Create(range(5)).with_output_types(int)
+ | ('even group' >> beam.Map(lambda x: (not x % 2, x))
.with_output_types(typehints.KV[bool, int]))
| 'even mean' >> combine.Mean.PerKey())
@@ -1439,8 +1441,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_mean_per_key_pipeline_checking_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'e' >> beam.Create(map(str, range(5))).with_output_types(str)
- | 'upper pair' >> (beam.Map(lambda x: (x.upper(), x))
+ | beam.Create(map(str, range(5))).with_output_types(str)
+ | ('upper pair' >> beam.Map(lambda x: (x.upper(), x))
.with_output_types(typehints.KV[str, str]))
| 'even mean' >> combine.Mean.PerKey())
self.p.run()
@@ -1455,8 +1457,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'c' >> beam.Create(range(5)).with_output_types(int)
- | 'odd group' >> (beam.Map(lambda x: (bool(x % 2), x))
+ | beam.Create(range(5)).with_output_types(int)
+ | ('odd group' >> beam.Map(lambda x: (bool(x % 2), x))
.with_output_types(typehints.KV[bool, int]))
| 'odd mean' >> combine.Mean.PerKey())
@@ -1470,8 +1472,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'c' >> beam.Create(range(5)).with_output_types(int)
- | 'odd group' >> (beam.Map(lambda x: (x, str(bool(x % 2))))
+ | beam.Create(range(5)).with_output_types(int)
+ | ('odd group' >> beam.Map(lambda x: (x, str(bool(x % 2))))
.with_output_types(typehints.KV[int, str]))
| 'odd mean' >> combine.Mean.PerKey())
self.p.run()
@@ -1514,8 +1516,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_perkey_pipeline_type_checking_satisfied(self):
d = (self.p
- | 'p' >> beam.Create(range(5)).with_output_types(int)
- | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
+ | beam.Create(range(5)).with_output_types(int)
+ | ('even group' >> beam.Map(lambda x: (not x % 2, x))
.with_output_types(typehints.KV[bool, int]))
| 'count int' >> combine.Count.PerKey())
@@ -1526,7 +1528,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_perkey_pipeline_type_checking_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'p' >> beam.Create(range(5)).with_output_types(int)
+ | beam.Create(range(5)).with_output_types(int)
| 'count int' >> combine.Count.PerKey())
self.assertEqual("Input type hint violation at GroupByKey: "
@@ -1538,7 +1540,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | beam.Create(['t', 'e', 's', 't']).with_output_types(str)
| 'dup key' >> beam.Map(lambda x: (x, x))
.with_output_types(typehints.KV[str, str])
| 'count dups' >> combine.Count.PerKey())
@@ -1549,7 +1551,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_perelement_pipeline_type_checking_satisfied(self):
d = (self.p
- | 'w' >> beam.Create([1, 1, 2, 3]).with_output_types(int)
+ | beam.Create([1, 1, 2, 3]).with_output_types(int)
| 'count elems' >> combine.Count.PerElement())
self.assertCompatible(typehints.KV[int, int], d.element_type)
@@ -1561,7 +1563,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'f' >> beam.Create([1, 1, 2, 3])
+ | beam.Create([1, 1, 2, 3])
| 'count elems' >> combine.Count.PerElement())
self.assertEqual('Pipeline type checking is enabled, however no output '
@@ -1573,7 +1575,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'w' >> beam.Create([True, True, False, True, True])
+ | beam.Create([True, True, False, True, True])
.with_output_types(bool)
| 'count elems' >> combine.Count.PerElement())
@@ -1583,7 +1585,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_top_of_pipeline_checking_satisfied(self):
d = (self.p
- | 'n' >> beam.Create(range(5, 11)).with_output_types(int)
+ | beam.Create(range(5, 11)).with_output_types(int)
| 'top 3' >> combine.Top.Of(3, lambda x, y: x < y))
self.assertCompatible(typehints.Iterable[int],
@@ -1595,7 +1597,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'n' >> beam.Create(list('testing')).with_output_types(str)
+ | beam.Create(list('testing')).with_output_types(str)
| 'acii top' >> combine.Top.Of(3, lambda x, y: x < y))
self.assertCompatible(typehints.Iterable[str], d.element_type)
@@ -1605,7 +1607,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_per_key_pipeline_checking_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'n' >> beam.Create(range(100)).with_output_types(int)
+ | beam.Create(range(100)).with_output_types(int)
| 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
| 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
@@ -1616,8 +1618,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_per_key_pipeline_checking_satisfied(self):
d = (self.p
- | 'n' >> beam.Create(range(100)).with_output_types(int)
- | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
+ | beam.Create(range(100)).with_output_types(int)
+ | ('group mod 3' >> beam.Map(lambda x: (x % 3, x))
.with_output_types(typehints.KV[int, int]))
| 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
@@ -1630,8 +1632,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'n' >> beam.Create(range(21))
- | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
+ | beam.Create(range(21))
+ | ('group mod 3' >> beam.Map(lambda x: (x % 3, x))
.with_output_types(typehints.KV[int, int]))
| 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
@@ -1642,7 +1644,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_sample_globally_pipeline_satisfied(self):
d = (self.p
- | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+ | beam.Create([2, 2, 3, 3]).with_output_types(int)
| 'sample' >> combine.Sample.FixedSizeGlobally(3))
self.assertCompatible(typehints.Iterable[int], d.element_type)
@@ -1658,7 +1660,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+ | beam.Create([2, 2, 3, 3]).with_output_types(int)
| 'sample' >> combine.Sample.FixedSizeGlobally(2))
self.assertCompatible(typehints.Iterable[int], d.element_type)
@@ -1672,7 +1674,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_sample_per_key_pipeline_satisfied(self):
d = (self.p
- | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
+ | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
.with_output_types(typehints.KV[int, int]))
| 'sample' >> combine.Sample.FixedSizePerKey(2))
@@ -1691,7 +1693,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
+ | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
.with_output_types(typehints.KV[int, int]))
| 'sample' >> combine.Sample.FixedSizePerKey(1))
@@ -1708,8 +1710,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_to_list_pipeline_check_satisfied(self):
d = (self.p
- | 'c' >> beam.Create((1, 2, 3, 4)).with_output_types(int)
- | 'to list' >> combine.ToList())
+ | beam.Create((1, 2, 3, 4)).with_output_types(int)
+ | combine.ToList())
self.assertCompatible(typehints.List[int], d.element_type)
@@ -1724,8 +1726,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'c' >> beam.Create(list('test')).with_output_types(str)
- | 'to list' >> combine.ToList())
+ | beam.Create(list('test')).with_output_types(str)
+ | combine.ToList())
self.assertCompatible(typehints.List[str], d.element_type)
@@ -1739,8 +1741,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_to_dict_pipeline_check_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'd' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
- | 'to dict' >> combine.ToDict())
+ | beam.Create([1, 2, 3, 4]).with_output_types(int)
+ | combine.ToDict())
self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
"requires Tuple[TypeVariable[K], "
@@ -1751,9 +1753,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_to_dict_pipeline_check_satisfied(self):
d = (self.p
| beam.Create(
- 'd',
[(1, 2), (3, 4)]).with_output_types(typehints.Tuple[int, int])
- | 'to dict' >> combine.ToDict())
+ | combine.ToDict())
self.assertCompatible(typehints.Dict[int, int], d.element_type)
assert_that(d, equal_to([{1: 2, 3: 4}]))
@@ -1763,9 +1764,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'd' >> (beam.Create([('1', 2), ('3', 4)])
+ | (beam.Create([('1', 2), ('3', 4)])
.with_output_types(typehints.Tuple[str, int]))
- | 'to dict' >> combine.ToDict())
+ | combine.ToDict())
self.assertCompatible(typehints.Dict[str, int], d.element_type)
assert_that(d, equal_to([{'1': 2, '3': 4}]))
@@ -1776,7 +1777,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(TypeError) as e:
(self.p
- | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | beam.Create([1, 2, 3]).with_output_types(int)
| 'len' >> beam.Map(lambda x: len(x)).with_output_types(int))
self.p.run()
@@ -1799,7 +1800,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
beam.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
def test_pipeline_inference(self):
- created = self.p | 'c' >> beam.Create(['a', 'b', 'c'])
+ created = self.p | beam.Create(['a', 'b', 'c'])
mapped = created | 'pair with 1' >> beam.Map(lambda x: (x, 1))
grouped = mapped | beam.GroupByKey()
self.assertEqual(str, created.element_type)
@@ -1810,7 +1811,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_inferred_bad_kv_type(self):
with self.assertRaises(typehints.TypeCheckError) as e:
_ = (self.p
- | 't' >> beam.Create(['a', 'b', 'c'])
+ | beam.Create(['a', 'b', 'c'])
| 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0))
| beam.GroupByKey())
@@ -1821,11 +1822,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_type_inference_command_line_flag_toggle(self):
self.p.options.view_as(TypeOptions).pipeline_type_check = False
- x = self.p | 't' >> beam.Create([1, 2, 3, 4])
+ x = self.p | 'c1' >> beam.Create([1, 2, 3, 4])
self.assertIsNone(x.element_type)
self.p.options.view_as(TypeOptions).pipeline_type_check = True
- x = self.p | 'm' >> beam.Create([1, 2, 3, 4])
+ x = self.p | 'c2' >> beam.Create([1, 2, 3, 4])
self.assertEqual(int, x.element_type)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index bbb7787..4564cf9 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -220,7 +220,8 @@ def assert_that(actual, matcher, label='assert_that'):
class AssertThat(PTransform):
def apply(self, pipeline):
- return pipeline | 'singleton' >> Create([None]) | Map(match, AllOf(actual))
+ return pipeline | 'singleton' >> Create([None]) | Map(match,
+ AllOf(actual))
def default_label(self):
return label
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 4e1ab68..f2e8f12 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -177,7 +177,7 @@ class SideInputTest(unittest.TestCase):
bad_side_input = p | 'bad_side' >> beam.Create(['z'])
with self.assertRaises(typehints.TypeCheckError):
- main_input | 'again' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input))
+ main_input | 'bis' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input))
def test_deferred_side_input_iterable(self):
@typehints.with_input_types(str, typehints.Iterable[str])