You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/19 21:26:24 UTC
[3/4] beam git commit: Remove unneeded labels,
and convert existing labels to UpperCamelCase.
Remove unneeded labels, and convert existing labels to UpperCamelCase.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d863e686
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d863e686
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d863e686
Branch: refs/heads/python-sdk
Commit: d863e6863b1487aa884151c9a317076dab8facd6
Parents: 0db60e4
Author: Ahmet Altay <al...@google.com>
Authored: Thu Jan 19 12:28:48 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Jan 19 12:28:48 2017 -0800
----------------------------------------------------------------------
.../apache_beam/examples/complete/tfidf.py | 38 +-
.../cookbook/bigquery_side_input_test.py | 13 +-
.../apache_beam/examples/cookbook/filters.py | 12 +-
.../examples/cookbook/mergecontacts.py | 16 +-
.../apache_beam/examples/streaming_wordcount.py | 10 +-
sdks/python/apache_beam/io/fileio_test.py | 12 +-
sdks/python/apache_beam/io/iobase.py | 12 +-
sdks/python/apache_beam/io/textio_test.py | 6 +-
sdks/python/apache_beam/runners/runner_test.py | 25 +-
.../apache_beam/transforms/ptransform_test.py | 513 +++++++++----------
10 files changed, 325 insertions(+), 332 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index c048cdd..367e275 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -42,9 +42,9 @@ def read_documents(pipeline, uris):
for uri in uris:
pcolls.append(
pipeline
- | 'read: %s' % uri >> ReadFromText(uri)
- | 'withkey: %s' % uri >> beam.Map(lambda v, uri: (uri, v), uri))
- return pcolls | 'flatten read pcolls' >> beam.Flatten()
+ | 'Read: %s' % uri >> ReadFromText(uri)
+ | 'WithKey: %s' % uri >> beam.Map(lambda v, uri: (uri, v), uri))
+ return pcolls | 'FlattenReadPColls' >> beam.Flatten()
class TfIdf(beam.PTransform):
@@ -61,9 +61,9 @@ class TfIdf(beam.PTransform):
# PCollection to use as side input.
total_documents = (
uri_to_content
- | 'get uris' >> beam.Keys()
- | 'get unique uris' >> beam.RemoveDuplicates()
- | ' count uris' >> beam.combiners.Count.Globally())
+ | 'GetUris 1' >> beam.Keys()
+ | 'GetUniqueUris' >> beam.RemoveDuplicates()
+ | 'CountUris' >> beam.combiners.Count.Globally())
# Create a collection of pairs mapping a URI to each of the words
# in the document associated with that that URI.
@@ -73,35 +73,35 @@ class TfIdf(beam.PTransform):
uri_to_words = (
uri_to_content
- | 'split words' >> beam.FlatMap(split_into_words))
+ | 'SplitWords' >> beam.FlatMap(split_into_words))
# Compute a mapping from each word to the total number of documents
# in which it appears.
word_to_doc_count = (
uri_to_words
- | 'get unique words per doc' >> beam.RemoveDuplicates()
- | 'get words' >> beam.Values()
- | 'count docs per word' >> beam.combiners.Count.PerElement())
+ | 'GetUniqueWordsPerDoc' >> beam.RemoveDuplicates()
+ | 'GetWords' >> beam.Values()
+ | 'CountDocsPerWord' >> beam.combiners.Count.PerElement())
# Compute a mapping from each URI to the total number of words in the
# document associated with that URI.
uri_to_word_total = (
uri_to_words
- | ' get uris' >> beam.Keys()
- | 'count words in doc' >> beam.combiners.Count.PerElement())
+ | 'GetUris 2' >> beam.Keys()
+ | 'CountWordsInDoc' >> beam.combiners.Count.PerElement())
# Count, for each (URI, word) pair, the number of occurrences of that word
# in the document associated with the URI.
uri_and_word_to_count = (
uri_to_words
- | 'count word-doc pairs' >> beam.combiners.Count.PerElement())
+ | 'CountWord-DocPairs' >> beam.combiners.Count.PerElement())
# Adjust the above collection to a mapping from (URI, word) pairs to counts
# into an isomorphic mapping from URI to (word, count) pairs, to prepare
# for a join by the URI key.
uri_to_word_and_count = (
uri_and_word_to_count
- | 'shift keys' >> beam.Map(
+ | 'ShiftKeys' >> beam.Map(
lambda ((uri, word), count): (uri, (word, count))))
# Perform a CoGroupByKey (a sort of pre-join) on the prepared
@@ -118,7 +118,7 @@ class TfIdf(beam.PTransform):
# ... ]}
uri_to_word_and_count_and_total = (
{'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count}
- | 'cogroup by uri' >> beam.CoGroupByKey())
+ | 'CoGroupByUri' >> beam.CoGroupByKey())
# Compute a mapping from each word to a (URI, term frequency) pair for each
# URI. A word's term frequency for a document is simply the number of times
@@ -134,7 +134,7 @@ class TfIdf(beam.PTransform):
word_to_uri_and_tf = (
uri_to_word_and_count_and_total
- | 'compute term frequencies' >> beam.FlatMap(compute_term_frequency))
+ | 'ComputeTermFrequencies' >> beam.FlatMap(compute_term_frequency))
# Compute a mapping from each word to its document frequency.
# A word's document frequency in a corpus is the number of
@@ -149,7 +149,7 @@ class TfIdf(beam.PTransform):
# DoFns in this way.
word_to_df = (
word_to_doc_count
- | 'compute doc frequencies' >> beam.Map(
+ | 'ComputeDocFrequencies' >> beam.Map(
lambda (word, count), total: (word, float(count) / total),
AsSingleton(total_documents)))
@@ -157,7 +157,7 @@ class TfIdf(beam.PTransform):
# each keyed on the word.
word_to_uri_and_tf_and_df = (
{'tf': word_to_uri_and_tf, 'df': word_to_df}
- | 'cogroup words by tf-df' >> beam.CoGroupByKey())
+ | 'CoGroupWordsByTf-df' >> beam.CoGroupByKey())
# Compute a mapping from each word to a (URI, TF-IDF) score for each URI.
# There are a variety of definitions of TF-IDF
@@ -172,7 +172,7 @@ class TfIdf(beam.PTransform):
word_to_uri_and_tfidf = (
word_to_uri_and_tf_and_df
- | 'compute tf-idf' >> beam.FlatMap(compute_tf_idf))
+ | 'ComputeTf-idf' >> beam.FlatMap(compute_tf_idf))
return word_to_uri_and_tfidf
http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 66cab77..5869976 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -30,14 +30,13 @@ class BigQuerySideInputTest(unittest.TestCase):
def test_create_groups(self):
p = TestPipeline()
- group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C'])
- corpus_pcoll = p | 'create_corpus' >> beam.Create(
+ group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C'])
+ corpus_pcoll = p | 'CreateCorpus' >> beam.Create(
[{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}])
- words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'},
- {'f': 'word2'},
- {'f': 'word3'}])
- ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1'])
- ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1'])
+ words_pcoll = p | 'CreateWords' >> beam.Create(
+ [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}])
+ ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1'])
+ ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1'])
groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll,
words_pcoll, ignore_corpus_pcoll,
http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index 7c77b9d..d13d823 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -53,21 +53,21 @@ def filter_cold_days(input_data, month_filter):
projection_fields = ['year', 'month', 'day', 'mean_temp']
fields_of_interest = (
input_data
- | 'projected' >> beam.Map(
+ | 'Projected' >> beam.Map(
lambda row: {f: row[f] for f in projection_fields}))
# Compute the global mean temperature.
global_mean = AsSingleton(
fields_of_interest
- | 'extract mean' >> beam.Map(lambda row: row['mean_temp'])
- | 'global mean' >> beam.combiners.Mean.Globally())
+ | 'ExtractMean' >> beam.Map(lambda row: row['mean_temp'])
+ | 'GlobalMean' >> beam.combiners.Mean.Globally())
# Filter to the rows representing days in the month of interest
# in which the mean daily temperature is below the global mean.
return (
fields_of_interest
- | 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter)
- | 'below mean' >> beam.Filter(
+ | 'DesiredMonth' >> beam.Filter(lambda row: row['month'] == month_filter)
+ | 'BelowMean' >> beam.Filter(
lambda row, mean: row['mean_temp'] < mean, global_mean))
@@ -92,7 +92,7 @@ def run(argv=None):
# pylint: disable=expression-not-assigned
(filter_cold_days(input_data, known_args.month_filter)
- | 'save to BQ' >> beam.io.Write(beam.io.BigQuerySink(
+ | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink(
known_args.output,
schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 55bdc50..c880a9a 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -74,12 +74,12 @@ def run(argv=None, assert_results=None):
# quotes/backslashes, and convert it a PCollection of (key, value) pairs.
def read_kv_textfile(label, textfile):
return (p
- | 'read_%s' % label >> ReadFromText(textfile)
- | 'backslash_%s' % label >> beam.Map(
+ | 'Read: %s' % label >> ReadFromText(textfile)
+ | 'Backslash: %s' % label >> beam.Map(
lambda x: re.sub(r'\\', r'\\\\', x))
- | 'escape_quotes_%s' % label >> beam.Map(
+ | 'EscapeQuotes: %s' % label >> beam.Map(
lambda x: re.sub(r'"', r'\"', x))
- | 'split_%s' % label >> beam.Map(
+ | 'Split: %s' % label >> beam.Map(
lambda x: re.split(r'\t+', x, 1)))
# Read input databases.
@@ -107,13 +107,13 @@ def run(argv=None, assert_results=None):
nomads = grouped | beam.Filter( # People without addresses.
lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None))
- num_luddites = luddites | 'luddites' >> beam.combiners.Count.Globally()
- num_writers = writers | 'writers' >> beam.combiners.Count.Globally()
- num_nomads = nomads | 'nomads' >> beam.combiners.Count.Globally()
+ num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
+ num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()
+ num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally()
# Write tab-delimited output.
# pylint: disable=expression-not-assigned
- tsv_lines | 'write_tsv' >> WriteToText(known_args.output_tsv)
+ tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv)
# TODO(silviuc): Move the assert_results logic to the unit test.
if assert_results is not None:
http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index e34a64e..7fb2c81 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -52,14 +52,14 @@ def run(argv=None):
# Capitalize the characters in each line.
transformed = (lines
- | 'split' >> (
+ | 'Split' >> (
beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
- | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+ | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(15, 0))
- | 'group' >> beam.GroupByKey()
- | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
- | 'format' >> beam.Map(lambda tup: '%s: %d' % tup))
+ | 'Group' >> beam.GroupByKey()
+ | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+ | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
# Write to PubSub.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 772e5e2..f75bc5d 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -762,8 +762,8 @@ class TestNativeTextFileSink(unittest.TestCase):
def test_write_native(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
- pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned
+ pcoll = pipeline | beam.core.Create(self.lines)
+ pcoll | beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned
pipeline.run()
read_result = []
@@ -775,8 +775,8 @@ class TestNativeTextFileSink(unittest.TestCase):
def test_write_native_auto_compression(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
- pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned
+ pcoll = pipeline | beam.core.Create(self.lines)
+ pcoll | beam.Write( # pylint: disable=expression-not-assigned
fileio.NativeTextFileSink(
self.path, file_name_suffix='.gz'))
pipeline.run()
@@ -790,8 +790,8 @@ class TestNativeTextFileSink(unittest.TestCase):
def test_write_native_auto_compression_unsharded(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
- pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned
+ pcoll = pipeline | beam.core.Create(self.lines)
+ pcoll | beam.Write( # pylint: disable=expression-not-assigned
fileio.NativeTextFileSink(
self.path + '.gz', shard_name_template=''))
pipeline.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 93421a6..12af3b6 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -727,7 +727,7 @@ class Write(ptransform.PTransform):
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
if isinstance(self.sink, dataflow_io.NativeSink):
# A native sink
- return pcoll | 'native_write' >> dataflow_io._NativeWrite(self.sink)
+ return pcoll | 'NativeWrite' >> dataflow_io._NativeWrite(self.sink)
elif isinstance(self.sink, Sink):
# A custom sink
return pcoll | WriteImpl(self.sink)
@@ -748,7 +748,7 @@ class WriteImpl(ptransform.PTransform):
def expand(self, pcoll):
do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None])
- init_result_coll = do_once | 'initialize_write' >> core.Map(
+ init_result_coll = do_once | 'InitializeWrite' >> core.Map(
lambda _, sink: sink.initialize_write(), self.sink)
if getattr(self.sink, 'num_shards', 0):
min_shards = self.sink.num_shards
@@ -759,20 +759,20 @@ class WriteImpl(ptransform.PTransform):
write_result_coll = (keyed_pcoll
| core.WindowInto(window.GlobalWindows())
| core.GroupByKey()
- | 'write_bundles' >> core.Map(
+ | 'WriteBundles' >> core.Map(
_write_keyed_bundle, self.sink,
AsSingleton(init_result_coll)))
else:
min_shards = 1
write_result_coll = (pcoll
- | 'write_bundles' >>
+ | 'WriteBundles' >>
core.ParDo(
_WriteBundleDoFn(), self.sink,
AsSingleton(init_result_coll))
- | 'pair' >> core.Map(lambda x: (None, x))
+ | 'Pair' >> core.Map(lambda x: (None, x))
| core.WindowInto(window.GlobalWindows())
| core.GroupByKey()
- | 'extract' >> core.FlatMap(lambda x: x[1]))
+ | 'Extract' >> core.FlatMap(lambda x: x[1]))
return do_once | core.FlatMap(
'finalize_write',
_finalize_write,
http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index ccb64ff..4b85584 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -524,7 +524,7 @@ class TextSinkTest(unittest.TestCase):
def test_write_dataflow(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
+ pcoll = pipeline | beam.core.Create(self.lines)
pcoll | 'Write' >> WriteToText(self.path) # pylint: disable=expression-not-assigned
pipeline.run()
@@ -537,7 +537,7 @@ class TextSinkTest(unittest.TestCase):
def test_write_dataflow_auto_compression(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
+ pcoll = pipeline | beam.core.Create(self.lines)
pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz') # pylint: disable=expression-not-assigned
pipeline.run()
@@ -550,7 +550,7 @@ class TextSinkTest(unittest.TestCase):
def test_write_dataflow_auto_compression_unsharded(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
+ pcoll = pipeline | beam.core.Create(self.lines)
pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='') # pylint: disable=expression-not-assigned
pipeline.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index f95b295..f522590 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -84,9 +84,9 @@ class RunnerTest(unittest.TestCase):
p = Pipeline(remote_runner,
options=PipelineOptions(self.default_properties))
- (p | 'create' >> ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned
- | 'do' >> ptransform.FlatMap(lambda x: [(x, x)])
- | 'gbk' >> ptransform.GroupByKey())
+ (p | ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned
+ | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
+ | ptransform.GroupByKey())
remote_runner.job = apiclient.Job(p.options)
super(DataflowRunner, remote_runner).run(p)
@@ -118,8 +118,8 @@ class RunnerTest(unittest.TestCase):
now = datetime.now()
# pylint: disable=expression-not-assigned
- (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5])
- | 'do' >> SpecialParDo(SpecialDoFn(), now))
+ (p | ptransform.Create([1, 2, 3, 4, 5])
+ | 'Do' >> SpecialParDo(SpecialDoFn(), now))
remote_runner.job = apiclient.Job(p.options)
super(DataflowRunner, remote_runner).run(p)
@@ -166,8 +166,8 @@ class RunnerTest(unittest.TestCase):
p = Pipeline(runner,
options=PipelineOptions(self.default_properties))
# pylint: disable=expression-not-assigned
- (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5])
- | 'do' >> beam.ParDo(MyDoFn()))
+ (p | ptransform.Create([1, 2, 3, 4, 5])
+ | 'Do' >> beam.ParDo(MyDoFn()))
result = p.run()
result.wait_until_finish()
metrics = result.metrics().query()
@@ -178,19 +178,19 @@ class RunnerTest(unittest.TestCase):
metrics['counters'],
hc.contains_inanyorder(
MetricResult(
- MetricKey('do', MetricName(namespace, 'elements')),
+ MetricKey('Do', MetricName(namespace, 'elements')),
5, 5),
MetricResult(
- MetricKey('do', MetricName(namespace, 'bundles')),
+ MetricKey('Do', MetricName(namespace, 'bundles')),
1, 1),
MetricResult(
- MetricKey('do', MetricName(namespace, 'finished_bundles')),
+ MetricKey('Do', MetricName(namespace, 'finished_bundles')),
1, 1)))
hc.assert_that(
metrics['distributions'],
hc.contains_inanyorder(
MetricResult(
- MetricKey('do', MetricName(namespace, 'element_dist')),
+ MetricKey('Do', MetricName(namespace, 'element_dist')),
DistributionResult(DistributionData(15, 5, 1, 5)),
DistributionResult(DistributionData(15, 5, 1, 5)))))
@@ -205,8 +205,7 @@ class RunnerTest(unittest.TestCase):
'--temp_location=/dev/null',
'--no_auth=True'
]))
- rows = p | 'read' >> beam.io.Read(
- beam.io.BigQuerySource('dataset.faketable'))
+ rows = p | beam.io.Read(beam.io.BigQuerySource('dataset.faketable'))
with self.assertRaises(ValueError,
msg=('Coder for the GroupByKey operation'
'"GroupByKey" is not a key-value coder: '
http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 13b963c..827bc83 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -54,8 +54,8 @@ class PTransformTest(unittest.TestCase):
str(PTransform()))
pa = TestPipeline()
- res = pa | 'a_label' >> beam.Create([1, 2])
- self.assertEqual('AppliedPTransform(a_label, Create)',
+ res = pa | 'ALabel' >> beam.Create([1, 2])
+ self.assertEqual('AppliedPTransform(ALabel, Create)',
str(res.producer))
pc = TestPipeline()
@@ -111,8 +111,8 @@ class PTransformTest(unittest.TestCase):
return [context.element + addon]
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
- result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10)
+ pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+ result = pcoll | 'Do' >> beam.ParDo(AddNDoFn(), 10)
assert_that(result, equal_to([11, 12, 13]))
pipeline.run()
@@ -123,39 +123,39 @@ class PTransformTest(unittest.TestCase):
pass
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+ pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
with self.assertRaises(ValueError):
- pcoll | 'do' >> beam.ParDo(MyDoFn) # Note the lack of ()'s
+ pcoll | 'Do' >> beam.ParDo(MyDoFn) # Note the lack of ()'s
def test_do_with_callable(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
- result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10)
+ pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+ result = pcoll | 'Do' >> beam.FlatMap(lambda x, addon: [x + addon], 10)
assert_that(result, equal_to([11, 12, 13]))
pipeline.run()
def test_do_with_side_input_as_arg(self):
pipeline = TestPipeline()
- side = pipeline | 'side' >> beam.Create([10])
- pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
- result = pcoll | beam.FlatMap(
- 'do', lambda x, addon: [x + addon], pvalue.AsSingleton(side))
+ side = pipeline | 'Side' >> beam.Create([10])
+ pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+ result = pcoll | 'Do' >> beam.FlatMap(
+ lambda x, addon: [x + addon], pvalue.AsSingleton(side))
assert_that(result, equal_to([11, 12, 13]))
pipeline.run()
def test_do_with_side_input_as_keyword_arg(self):
pipeline = TestPipeline()
- side = pipeline | 'side' >> beam.Create([10])
- pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
- result = pcoll | beam.FlatMap(
- 'do', lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side))
+ side = pipeline | 'Side' >> beam.Create([10])
+ pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+ result = pcoll | 'Do' >> beam.FlatMap(
+ lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side))
assert_that(result, equal_to([11, 12, 13]))
pipeline.run()
def test_do_with_do_fn_returning_string_raises_warning(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
- pcoll | 'do' >> beam.FlatMap(lambda x: x + '1')
+ pcoll = pipeline | 'Start' >> beam.Create(['2', '9', '3'])
+ pcoll | 'Do' >> beam.FlatMap(lambda x: x + '1')
# Since the DoFn directly returns a string we should get an error warning
# us.
@@ -168,8 +168,8 @@ class PTransformTest(unittest.TestCase):
def test_do_with_do_fn_returning_dict_raises_warning(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
- pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'})
+ pcoll = pipeline | 'Start' >> beam.Create(['2', '9', '3'])
+ pcoll | 'Do' >> beam.FlatMap(lambda x: {x: '1'})
# Since the DoFn directly returns a dict we should get an error warning
# us.
@@ -182,9 +182,9 @@ class PTransformTest(unittest.TestCase):
def test_do_with_side_outputs_maintains_unique_name(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
- r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
- r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m')
+ pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+ r1 = pcoll | 'A' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
+ r2 = pcoll | 'B' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m')
assert_that(r1.m, equal_to([2, 3, 4]), label='r1')
assert_that(r2.m, equal_to([3, 4, 5]), label='r2')
pipeline.run()
@@ -195,8 +195,8 @@ class PTransformTest(unittest.TestCase):
def incorrect_par_do_fn(x):
return x + 5
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create([2, 9, 3])
- pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn)
+ pcoll = pipeline | 'Start' >> beam.Create([2, 9, 3])
+ pcoll | 'Do' >> beam.FlatMap(incorrect_par_do_fn)
# It's a requirement that all user-defined functions to a ParDo return
# an iterable.
with self.assertRaises(typehints.TypeCheckError) as cm:
@@ -216,8 +216,8 @@ class PTransformTest(unittest.TestCase):
def finish_bundle(self, c):
yield 'finish'
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
- result = pcoll | 'do' >> beam.ParDo(MyDoFn())
+ pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+ result = pcoll | 'Do' >> beam.ParDo(MyDoFn())
# May have many bundles, but each has a start and finish.
def matcher():
@@ -231,9 +231,8 @@ class PTransformTest(unittest.TestCase):
def test_filter(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4])
- result = pcoll | beam.Filter(
- 'filter', lambda x: x % 2 == 0)
+ pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3, 4])
+ result = pcoll | 'Filter' >> beam.Filter(lambda x: x % 2 == 0)
assert_that(result, equal_to([2, 4]))
pipeline.run()
@@ -257,15 +256,15 @@ class PTransformTest(unittest.TestCase):
def test_combine_with_combine_fn(self):
vals = [1, 2, 3, 4, 5, 6, 7]
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create(vals)
- result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn())
+ pcoll = pipeline | 'Start' >> beam.Create(vals)
+ result = pcoll | 'Mean' >> beam.CombineGlobally(self._MeanCombineFn())
assert_that(result, equal_to([sum(vals) / len(vals)]))
pipeline.run()
def test_combine_with_callable(self):
vals = [1, 2, 3, 4, 5, 6, 7]
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create(vals)
+ pcoll = pipeline | 'Start' >> beam.Create(vals)
result = pcoll | beam.CombineGlobally(sum)
assert_that(result, equal_to([sum(vals)]))
pipeline.run()
@@ -273,10 +272,9 @@ class PTransformTest(unittest.TestCase):
def test_combine_with_side_input_as_arg(self):
values = [1, 2, 3, 4, 5, 6, 7]
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create(values)
- divisor = pipeline | 'divisor' >> beam.Create([2])
- result = pcoll | beam.CombineGlobally(
- 'max',
+ pcoll = pipeline | 'Start' >> beam.Create(values)
+ divisor = pipeline | 'Divisor' >> beam.Create([2])
+ result = pcoll | 'Max' >> beam.CombineGlobally(
# Multiples of divisor only.
lambda vals, d: max(v for v in vals if v % d == 0),
pvalue.AsSingleton(divisor)).without_defaults()
@@ -288,9 +286,9 @@ class PTransformTest(unittest.TestCase):
vals_1 = [1, 2, 3, 4, 5, 6, 7]
vals_2 = [2, 4, 6, 8, 10, 12, 14]
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
+ pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] +
[('b', x) for x in vals_2]))
- result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
+ result = pcoll | 'Mean' >> beam.CombinePerKey(self._MeanCombineFn())
assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
('b', sum(vals_2) / len(vals_2))]))
pipeline.run()
@@ -299,7 +297,7 @@ class PTransformTest(unittest.TestCase):
vals_1 = [1, 2, 3, 4, 5, 6, 7]
vals_2 = [2, 4, 6, 8, 10, 12, 14]
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
+ pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] +
[('b', x) for x in vals_2]))
result = pcoll | beam.CombinePerKey(sum)
assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))]))
@@ -309,9 +307,9 @@ class PTransformTest(unittest.TestCase):
vals_1 = [1, 2, 3, 4, 5, 6, 7]
vals_2 = [2, 4, 6, 8, 10, 12, 14]
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
+ pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] +
[('b', x) for x in vals_2]))
- divisor = pipeline | 'divisor' >> beam.Create([2])
+ divisor = pipeline | 'Divisor' >> beam.Create([2])
result = pcoll | beam.CombinePerKey(
lambda vals, d: max(v for v in vals if v % d == 0),
pvalue.AsSingleton(divisor)) # Multiples of divisor only.
@@ -324,7 +322,7 @@ class PTransformTest(unittest.TestCase):
pipeline = TestPipeline()
pcoll = pipeline | beam.Create(
'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
- result = pcoll | 'group' >> beam.GroupByKey()
+ result = pcoll | 'Group' >> beam.GroupByKey()
assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
pipeline.run()
@@ -336,9 +334,9 @@ class PTransformTest(unittest.TestCase):
return (context.element % 3) + offset
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
+ pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
# Attempt nominal partition operation.
- partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1)
+ partitions = pcoll | 'Part 1' >> beam.Partition(SomePartitionFn(), 4, 1)
assert_that(partitions[0], equal_to([]))
assert_that(partitions[1], equal_to([0, 3, 6]), label='p1')
assert_that(partitions[2], equal_to([1, 4, 7]), label='p2')
@@ -348,14 +346,14 @@ class PTransformTest(unittest.TestCase):
# Check that a bad partition label will yield an error. For the
# DirectRunner, this error manifests as an exception.
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
- partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000)
+ pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
+ partitions = pcoll | 'Part 2' >> beam.Partition(SomePartitionFn(), 4, 10000)
with self.assertRaises(ValueError):
pipeline.run()
def test_partition_with_callable(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
+ pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
partitions = (
pcoll | beam.Partition(
'part',
@@ -380,48 +378,47 @@ class PTransformTest(unittest.TestCase):
def test_flatten_pcollections(self):
pipeline = TestPipeline()
- pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
- pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
- result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten()
+ pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3])
+ pcoll_2 = pipeline | 'Start 2' >> beam.Create([4, 5, 6, 7])
+ result = (pcoll_1, pcoll_2) | 'Flatten' >> beam.Flatten()
assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
pipeline.run()
def test_flatten_no_pcollections(self):
pipeline = TestPipeline()
with self.assertRaises(ValueError):
- () | 'pipeline arg missing' >> beam.Flatten()
- result = () | 'empty' >> beam.Flatten(pipeline=pipeline)
+ () | 'PipelineArgMissing' >> beam.Flatten()
+ result = () | 'Empty' >> beam.Flatten(pipeline=pipeline)
assert_that(result, equal_to([]))
pipeline.run()
def test_flatten_pcollections_in_iterable(self):
pipeline = TestPipeline()
- pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
- pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
- result = ([pcoll for pcoll in (pcoll_1, pcoll_2)]
- | 'flatten' >> beam.Flatten())
+ pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3])
+ pcoll_2 = pipeline | 'Start 2' >> beam.Create([4, 5, 6, 7])
+ result = [pcoll for pcoll in (pcoll_1, pcoll_2)] | beam.Flatten()
assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
pipeline.run()
def test_flatten_input_type_must_be_iterable(self):
# Inputs to flatten *must* be an iterable.
with self.assertRaises(ValueError):
- 4 | 'flatten' >> beam.Flatten()
+ 4 | beam.Flatten()
def test_flatten_input_type_must_be_iterable_of_pcolls(self):
# Inputs to flatten *must* be an iterable of PCollections.
with self.assertRaises(TypeError):
- {'l': 'test'} | 'flatten' >> beam.Flatten()
+ {'l': 'test'} | beam.Flatten()
with self.assertRaises(TypeError):
- set([1, 2, 3]) | 'flatten' >> beam.Flatten()
+ set([1, 2, 3]) | beam.Flatten()
def test_co_group_by_key_on_list(self):
pipeline = TestPipeline()
- pcoll_1 = pipeline | beam.Create(
- 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
- pcoll_2 = pipeline | beam.Create(
- 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
- result = (pcoll_1, pcoll_2) | 'cgbk' >> beam.CoGroupByKey()
+ pcoll_1 = pipeline | 'Start 1' >> beam.Create(
+ [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
+ pcoll_2 = pipeline | 'Start 2' >> beam.Create(
+ [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
+ result = (pcoll_1, pcoll_2) | beam.CoGroupByKey()
assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
('b', ([3], [])),
('c', ([4], [7, 8]))]))
@@ -429,12 +426,11 @@ class PTransformTest(unittest.TestCase):
def test_co_group_by_key_on_iterable(self):
pipeline = TestPipeline()
- pcoll_1 = pipeline | beam.Create(
- 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
- pcoll_2 = pipeline | beam.Create(
- 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
- result = ([pc for pc in (pcoll_1, pcoll_2)]
- | 'cgbk' >> beam.CoGroupByKey())
+ pcoll_1 = pipeline | 'Start 1' >> beam.Create(
+ [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
+ pcoll_2 = pipeline | 'Start 2' >> beam.Create(
+ [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
+ result = [pc for pc in (pcoll_1, pcoll_2)] | beam.CoGroupByKey()
assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
('b', ([3], [])),
('c', ([4], [7, 8]))]))
@@ -442,11 +438,11 @@ class PTransformTest(unittest.TestCase):
def test_co_group_by_key_on_dict(self):
pipeline = TestPipeline()
- pcoll_1 = pipeline | beam.Create(
- 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
- pcoll_2 = pipeline | beam.Create(
- 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
- result = {'X': pcoll_1, 'Y': pcoll_2} | 'cgbk' >> beam.CoGroupByKey()
+ pcoll_1 = pipeline | 'Start 1' >> beam.Create(
+ [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
+ pcoll_2 = pipeline | 'Start 2' >> beam.Create(
+ [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
+ result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey()
assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}),
('b', {'X': [3], 'Y': []}),
('c', {'X': [4], 'Y': [7, 8]})]))
@@ -478,8 +474,8 @@ class PTransformTest(unittest.TestCase):
def test_keys_and_values(self):
pipeline = TestPipeline()
- pcoll = pipeline | beam.Create(
- 'start', [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)])
+ pcoll = pipeline | 'Start' >> beam.Create(
+ [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)])
keys = pcoll.apply('keys', beam.Keys())
vals = pcoll.apply('vals', beam.Values())
assert_that(keys, equal_to([1, 2, 2, 3, 3, 3]), label='assert:keys')
@@ -488,16 +484,16 @@ class PTransformTest(unittest.TestCase):
def test_kv_swap(self):
pipeline = TestPipeline()
- pcoll = pipeline | beam.Create(
- 'start', [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)])
+ pcoll = pipeline | 'Start' >> beam.Create(
+ [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)])
result = pcoll.apply('swap', beam.KvSwap())
assert_that(result, equal_to([(1, 7), (2, 1), (2, 3), (2, 5), (3, 6)]))
pipeline.run()
def test_remove_duplicates(self):
pipeline = TestPipeline()
- pcoll = pipeline | beam.Create(
- 'start', [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
+ pcoll = pipeline | 'Start' >> beam.Create(
+ [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
result = pcoll.apply('nodupes', beam.RemoveDuplicates())
assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
pipeline.run()
@@ -507,15 +503,15 @@ class PTransformTest(unittest.TestCase):
t = (beam.Map(lambda x: (x, 1))
| beam.GroupByKey()
| beam.Map(lambda (x, ones): (x, sum(ones))))
- result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t
+ result = pipeline | 'Start' >> beam.Create(['a', 'a', 'b']) | t
assert_that(result, equal_to([('a', 2), ('b', 1)]))
pipeline.run()
def test_apply_to_list(self):
self.assertItemsEqual(
- [1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1))
+ [1, 2, 3], [0, 1, 2] | 'AddOne' >> beam.Map(lambda x: x + 1))
self.assertItemsEqual([1],
- [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2))
+ [0, 1, 2] | 'Odd' >> beam.Filter(lambda x: x % 2))
self.assertItemsEqual([1, 2, 100, 3],
([1, 2, 3], [100]) | beam.Flatten())
join_input = ([('k', 'a')],
@@ -575,47 +571,47 @@ class PTransformLabelsTest(unittest.TestCase):
pardo = None
def expand(self, pcoll):
- self.pardo = '*do*' >> beam.FlatMap(lambda x: [x + 1])
+ self.pardo = '*Do*' >> beam.FlatMap(lambda x: [x + 1])
return pcoll | self.pardo
def test_chained_ptransforms(self):
"""Tests that chaining gets proper nesting."""
pipeline = TestPipeline()
- map1 = 'map1' >> beam.Map(lambda x: (x, 1))
- gbk = 'gbk' >> beam.GroupByKey()
- map2 = 'map2' >> beam.Map(lambda (x, ones): (x, sum(ones)))
+ map1 = 'Map1' >> beam.Map(lambda x: (x, 1))
+ gbk = 'Gbk' >> beam.GroupByKey()
+ map2 = 'Map2' >> beam.Map(lambda (x, ones): (x, sum(ones)))
t = (map1 | gbk | map2)
- result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t
- self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels)
- self.assertTrue('map1|gbk|map2/gbk' in pipeline.applied_labels)
- self.assertTrue('map1|gbk|map2/map2' in pipeline.applied_labels)
+ result = pipeline | 'Start' >> beam.Create(['a', 'a', 'b']) | t
+ self.assertTrue('Map1|Gbk|Map2/Map1' in pipeline.applied_labels)
+ self.assertTrue('Map1|Gbk|Map2/Gbk' in pipeline.applied_labels)
+ self.assertTrue('Map1|Gbk|Map2/Map2' in pipeline.applied_labels)
assert_that(result, equal_to([('a', 2), ('b', 1)]))
pipeline.run()
def test_apply_custom_transform_without_label(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
+ pcoll = pipeline | 'PColl' >> beam.Create([1, 2, 3])
custom = PTransformLabelsTest.CustomTransform()
result = pipeline.apply(custom, pcoll)
self.assertTrue('CustomTransform' in pipeline.applied_labels)
- self.assertTrue('CustomTransform/*do*' in pipeline.applied_labels)
+ self.assertTrue('CustomTransform/*Do*' in pipeline.applied_labels)
assert_that(result, equal_to([2, 3, 4]))
pipeline.run()
def test_apply_custom_transform_with_label(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
- custom = PTransformLabelsTest.CustomTransform('*custom*')
+ pcoll = pipeline | 'PColl' >> beam.Create([1, 2, 3])
+ custom = PTransformLabelsTest.CustomTransform('*Custom*')
result = pipeline.apply(custom, pcoll)
- self.assertTrue('*custom*' in pipeline.applied_labels)
- self.assertTrue('*custom*/*do*' in pipeline.applied_labels)
+ self.assertTrue('*Custom*' in pipeline.applied_labels)
+ self.assertTrue('*Custom*/*Do*' in pipeline.applied_labels)
assert_that(result, equal_to([2, 3, 4]))
pipeline.run()
def test_combine_without_label(self):
vals = [1, 2, 3, 4, 5, 6, 7]
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create(vals)
+ pcoll = pipeline | 'Start' >> beam.Create(vals)
combine = beam.CombineGlobally(sum)
result = pcoll | combine
self.assertTrue('CombineGlobally(sum)' in pipeline.applied_labels)
@@ -624,28 +620,28 @@ class PTransformLabelsTest(unittest.TestCase):
def test_apply_ptransform_using_decorator(self):
pipeline = TestPipeline()
- pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
- sample = SamplePTransform('*sample*')
+ pcoll = pipeline | 'PColl' >> beam.Create([1, 2, 3])
+ sample = SamplePTransform('*Sample*')
_ = pcoll | sample
- self.assertTrue('*sample*' in pipeline.applied_labels)
- self.assertTrue('*sample*/ToPairs' in pipeline.applied_labels)
- self.assertTrue('*sample*/Group' in pipeline.applied_labels)
- self.assertTrue('*sample*/RemoveDuplicates' in pipeline.applied_labels)
+ self.assertTrue('*Sample*' in pipeline.applied_labels)
+ self.assertTrue('*Sample*/ToPairs' in pipeline.applied_labels)
+ self.assertTrue('*Sample*/Group' in pipeline.applied_labels)
+ self.assertTrue('*Sample*/RemoveDuplicates' in pipeline.applied_labels)
def test_combine_with_label(self):
vals = [1, 2, 3, 4, 5, 6, 7]
pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create(vals)
- combine = '*sum*' >> beam.CombineGlobally(sum)
+ pcoll = pipeline | 'Start' >> beam.Create(vals)
+ combine = '*Sum*' >> beam.CombineGlobally(sum)
result = pcoll | combine
- self.assertTrue('*sum*' in pipeline.applied_labels)
+ self.assertTrue('*Sum*' in pipeline.applied_labels)
assert_that(result, equal_to([sum(vals)]))
pipeline.run()
def check_label(self, ptransform, expected_label):
pipeline = TestPipeline()
- pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform
- actual_label = sorted(pipeline.applied_labels - {'start'})[0]
+ pipeline | 'Start' >> beam.Create([('a', 1)]) | ptransform
+ actual_label = sorted(pipeline.applied_labels - {'Start'})[0]
self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
def test_default_labels(self):
@@ -737,8 +733,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return [context.element + five]
d = (self.p
- | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
- | 'add' >> beam.ParDo(AddWithFive(), 5))
+ | 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'Add' >> beam.ParDo(AddWithFive(), 5))
assert_that(d, equal_to([6, 7, 8]))
self.p.run()
@@ -752,10 +748,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
- | 'upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello'))
+ | 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'Upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello'))
- self.assertEqual("Type hint violation for 'upper': "
+ self.assertEqual("Type hint violation for 'Upper': "
"requires <type 'str'> but got <type 'int'> for context",
e.exception.message)
@@ -769,8 +765,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return [context.element + num]
d = (self.p
- | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
- | 'add' >> beam.ParDo(AddWithNum(), 5))
+ | 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'Add' >> beam.ParDo(AddWithNum(), 5))
assert_that(d, equal_to([6, 7, 8]))
self.p.run()
@@ -786,11 +782,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 't' >> beam.Create(['1', '2', '3']).with_output_types(str)
- | 'add' >> beam.ParDo(AddWithNum(), 5))
+ | 'T' >> beam.Create(['1', '2', '3']).with_output_types(str)
+ | 'Add' >> beam.ParDo(AddWithNum(), 5))
self.p.run()
- self.assertEqual("Type hint violation for 'add': "
+ self.assertEqual("Type hint violation for 'Add': "
"requires <type 'int'> but got <type 'str'> for context",
e.exception.message)
@@ -804,10 +800,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# will receive a str instead, which should result in a raised exception.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 's' >> beam.Create(['b', 'a', 'r']).with_output_types(str)
- | 'to str' >> beam.FlatMap(int_to_str))
+ | 'S' >> beam.Create(['b', 'a', 'r']).with_output_types(str)
+ | 'ToStr' >> beam.FlatMap(int_to_str))
- self.assertEqual("Type hint violation for 'to str': "
+ self.assertEqual("Type hint violation for 'ToStr': "
"requires <type 'int'> but got <type 'str'> for a",
e.exception.message)
@@ -819,8 +815,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# If this type-checks than no error should be raised.
d = (self.p
- | 't' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
- | 'case' >> beam.FlatMap(to_all_upper_case))
+ | 'T' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | 'Case' >> beam.FlatMap(to_all_upper_case))
assert_that(d, equal_to(['T', 'E', 'S', 'T']))
self.p.run()
@@ -833,23 +829,23 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# expecting pcoll's of type str instead.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
- | ('score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2])
+ | 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | ('Score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2])
.with_input_types(str).with_output_types(int))
- | ('upper' >> beam.FlatMap(lambda x: [x.upper()])
+ | ('Upper' >> beam.FlatMap(lambda x: [x.upper()])
.with_input_types(str).with_output_types(str)))
- self.assertEqual("Type hint violation for 'upper': "
+ self.assertEqual("Type hint violation for 'Upper': "
"requires <type 'str'> but got <type 'int'> for x",
e.exception.message)
def test_pardo_properly_type_checks_using_type_hint_methods(self):
# Pipeline should be created successfully without an error
d = (self.p
- | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
- | 'dup' >> beam.FlatMap(lambda x: [x + x])
+ | 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | 'Dup' >> beam.FlatMap(lambda x: [x + x])
.with_input_types(str).with_output_types(str)
- | 'upper' >> beam.FlatMap(lambda x: [x.upper()])
+ | 'Upper' >> beam.FlatMap(lambda x: [x.upper()])
.with_input_types(str).with_output_types(str))
assert_that(d, equal_to(['TT', 'EE', 'SS', 'TT']))
@@ -860,19 +856,19 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# int's, while Map is expecting one of str.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
- | 'upper' >> beam.Map(lambda x: x.upper())
+ | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+ | 'Upper' >> beam.Map(lambda x: x.upper())
.with_input_types(str).with_output_types(str))
- self.assertEqual("Type hint violation for 'upper': "
+ self.assertEqual("Type hint violation for 'Upper': "
"requires <type 'str'> but got <type 'int'> for x",
e.exception.message)
def test_map_properly_type_checks_using_type_hints_methods(self):
# No error should be raised if this type-checks properly.
d = (self.p
- | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
- | 'to_str' >> beam.Map(lambda x: str(x))
+ | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+ | 'ToStr' >> beam.Map(lambda x: str(x))
.with_input_types(int).with_output_types(str))
assert_that(d, equal_to(['1', '2', '3', '4']))
self.p.run()
@@ -887,10 +883,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# However, 'Map' should detect that Create has hinted an int instead.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
- | 'upper' >> beam.Map(upper))
+ | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+ | 'Upper' >> beam.Map(upper))
- self.assertEqual("Type hint violation for 'upper': "
+ self.assertEqual("Type hint violation for 'Upper': "
"requires <type 'str'> but got <type 'int'> for s",
e.exception.message)
@@ -912,12 +908,12 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# incoming.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
- | 'lower' >> beam.Map(lambda x: x.lower())
+ | 'Strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
+ | 'Lower' >> beam.Map(lambda x: x.lower())
.with_input_types(str).with_output_types(str)
- | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
+ | 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
- self.assertEqual("Type hint violation for 'below 3': "
+ self.assertEqual("Type hint violation for 'Below 3': "
"requires <type 'int'> but got <type 'str'> for x",
e.exception.message)
@@ -925,9 +921,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# No error should be raised if this type-checks properly.
d = (self.p
| beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
- | 'to int' >> beam.Map(lambda x: int(x))
+ | 'ToInt' >> beam.Map(lambda x: int(x))
.with_input_types(str).with_output_types(int)
- | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
+ | 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
assert_that(d, equal_to([1, 2]))
self.p.run()
@@ -939,10 +935,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# Func above was hinted to only take a float, yet an int will be passed.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
- | 'half' >> beam.Filter(more_than_half))
+ | 'Ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+ | 'Half' >> beam.Filter(more_than_half))
- self.assertEqual("Type hint violation for 'half': "
+ self.assertEqual("Type hint violation for 'Half': "
"requires <type 'float'> but got <type 'int'> for a",
e.exception.message)
@@ -954,15 +950,15 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# Filter should deduce that it returns the same type that it takes.
(self.p
- | 'str' >> beam.Create(range(5)).with_output_types(int)
- | 'half' >> beam.Filter(half)
- | 'to bool' >> beam.Map(lambda x: bool(x))
+ | 'Str' >> beam.Create(range(5)).with_output_types(int)
+ | 'Half' >> beam.Filter(half)
+ | 'ToBool' >> beam.Map(lambda x: bool(x))
.with_input_types(int).with_output_types(bool))
def test_group_by_key_only_output_type_deduction(self):
d = (self.p
- | 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
- | ('pair' >> beam.Map(lambda x: (x, ord(x)))
+ | 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | ('Pair' >> beam.Map(lambda x: (x, ord(x)))
.with_output_types(typehints.KV[str, str]))
| beam.GroupByKeyOnly())
@@ -973,8 +969,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_group_by_key_output_type_deduction(self):
d = (self.p
- | 'str' >> beam.Create(range(20)).with_output_types(int)
- | ('pair negative' >> beam.Map(lambda x: (x % 5, -x))
+ | 'Str' >> beam.Create(range(20)).with_output_types(int)
+ | ('PairNegative' >> beam.Map(lambda x: (x % 5, -x))
.with_output_types(typehints.KV[int, int]))
| beam.GroupByKey())
@@ -1016,11 +1012,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# information to the ParDo.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'nums' >> beam.Create(range(5))
- | 'mod dup' >> beam.FlatMap(lambda x: (x % 2, x)))
+ | 'Nums' >> beam.Create(range(5))
+ | 'ModDup' >> beam.FlatMap(lambda x: (x % 2, x)))
self.assertEqual('Pipeline type checking is enabled, however no output '
- 'type-hint was found for the PTransform Create(nums)',
+ 'type-hint was found for the PTransform Create(Nums)',
e.exception.message)
def test_pipeline_checking_gbk_insufficient_type_information(self):
@@ -1029,13 +1025,13 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# information to GBK-only.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'nums' >> beam.Create(range(5)).with_output_types(int)
- | 'mod dup' >> beam.Map(lambda x: (x % 2, x))
+ | 'Nums' >> beam.Create(range(5)).with_output_types(int)
+ | 'ModDup' >> beam.Map(lambda x: (x % 2, x))
| beam.GroupByKeyOnly())
self.assertEqual('Pipeline type checking is enabled, however no output '
'type-hint was found for the PTransform '
- 'ParDo(mod dup)',
+ 'ParDo(ModDup)',
e.exception.message)
def test_disable_pipeline_type_check(self):
@@ -1044,8 +1040,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# The pipeline below should raise a TypeError, however pipeline type
# checking was disabled above.
(self.p
- | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
- | 'lower' >> beam.Map(lambda x: x.lower())
+ | 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'Lower' >> beam.Map(lambda x: x.lower())
.with_input_types(str).with_output_types(str))
def test_run_time_type_checking_enabled_type_violation(self):
@@ -1060,14 +1056,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# Function above has been type-hinted to only accept an int. But in the
# pipeline execution it'll be passed a string due to the output of Create.
(self.p
- | 't' >> beam.Create(['some_string'])
- | 'to str' >> beam.Map(int_to_string))
+ | 'T' >> beam.Create(['some_string'])
+ | 'ToStr' >> beam.Map(int_to_string))
with self.assertRaises(typehints.TypeCheckError) as e:
self.p.run()
self.assertStartswith(
e.exception.message,
- "Runtime type violation detected within ParDo(to str): "
+ "Runtime type violation detected within ParDo(ToStr): "
"Type-hint for argument: 'x' violated. "
"Expected an instance of <type 'int'>, "
"instead found some_string, an instance of <type 'str'>.")
@@ -1084,9 +1080,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# Pipeline checking is off, but the above function should satisfy types at
# run-time.
result = (self.p
- | 't' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g'])
+ | 'T' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g'])
.with_output_types(str)
- | 'gen keys' >> beam.Map(group_with_upper_ord)
+ | 'GenKeys' >> beam.Map(group_with_upper_ord)
| 'O' >> beam.GroupByKey())
assert_that(result, equal_to([(1, ['g']),
@@ -1106,9 +1102,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return (a % 2, a)
(self.p
- | 'nums' >> beam.Create(range(5)).with_output_types(int)
- | 'is even' >> beam.Map(is_even_as_key)
- | 'parity' >> beam.GroupByKey())
+ | 'Nums' >> beam.Create(range(5)).with_output_types(int)
+ | 'IsEven' >> beam.Map(is_even_as_key)
+ | 'Parity' >> beam.GroupByKey())
# Although all the types appear to be correct when checked at pipeline
# construction. Runtime type-checking should detect the 'is_even_as_key' is
@@ -1118,7 +1114,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.assertStartswith(
e.exception.message,
- "Runtime type violation detected within ParDo(is even): "
+ "Runtime type violation detected within ParDo(IsEven): "
"Tuple[bool, int] hint type-constraint violated. "
"The type of element #0 in the passed tuple is incorrect. "
"Expected an instance of type bool, "
@@ -1135,9 +1131,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return (a % 2 == 0, a)
result = (self.p
- | 'nums' >> beam.Create(range(5)).with_output_types(int)
- | 'is even' >> beam.Map(is_even_as_key)
- | 'parity' >> beam.GroupByKey())
+ | 'Nums' >> beam.Create(range(5)).with_output_types(int)
+ | 'IsEven' >> beam.Map(is_even_as_key)
+ | 'Parity' >> beam.GroupByKey())
assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])]))
self.p.run()
@@ -1152,13 +1148,13 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create([1, 2, 3])
- | ('to int' >> beam.FlatMap(lambda x: [int(x)])
+ | ('ToInt' >> beam.FlatMap(lambda x: [int(x)])
.with_input_types(str).with_output_types(int)))
self.p.run()
self.assertStartswith(
e.exception.message,
- "Runtime type violation detected within ParDo(to int): "
+ "Runtime type violation detected within ParDo(ToInt): "
"Type-hint for argument: 'x' violated. "
"Expected an instance of <type 'str'>, "
"instead found 1, an instance of <type 'int'>.")
@@ -1170,14 +1166,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
- | ('add' >> beam.FlatMap(lambda (x, y): [x + y])
+ | ('Add' >> beam.FlatMap(lambda (x, y): [x + y])
.with_input_types(typehints.Tuple[int, int]).with_output_types(int))
)
self.p.run()
self.assertStartswith(
e.exception.message,
- "Runtime type violation detected within ParDo(add): "
+ "Runtime type violation detected within ParDo(Add): "
"Type-hint for argument: 'y' violated. "
"Expected an instance of <type 'int'>, "
"instead found 3.0, an instance of <type 'float'>.")
@@ -1189,14 +1185,13 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# The type-hinted applied via the 'returns()' method indicates the ParDo
# should output an instance of type 'int', however a 'float' will be
# generated instead.
- print "HINTS", beam.FlatMap(
- 'to int',
+ print "HINTS", ('ToInt' >> beam.FlatMap(
lambda x: [float(x)]).with_input_types(int).with_output_types(
- int).get_type_hints()
+ int)).get_type_hints()
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create([1, 2, 3])
- | ('to int' >> beam.FlatMap(lambda x: [float(x)])
+ | ('ToInt' >> beam.FlatMap(lambda x: [float(x)])
.with_input_types(int).with_output_types(int))
)
self.p.run()
@@ -1204,7 +1199,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.assertStartswith(
e.exception.message,
"Runtime type violation detected within "
- "ParDo(to int): "
+ "ParDo(ToInt): "
"According to type-hint expected output should be "
"of type <type 'int'>. Instead, received '1.0', "
"an instance of type <type 'float'>.")
@@ -1219,7 +1214,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
- | ('swap' >> beam.FlatMap(lambda (x, y): [x + y])
+ | ('Swap' >> beam.FlatMap(lambda (x, y): [x + y])
.with_input_types(typehints.Tuple[int, float])
.with_output_types(typehints.Tuple[float, int]))
)
@@ -1228,7 +1223,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.assertStartswith(
e.exception.message,
"Runtime type violation detected within "
- "ParDo(swap): Tuple type constraint violated. "
+ "ParDo(Swap): Tuple type constraint violated. "
"Valid object instance must be of type 'tuple'. Instead, "
"an instance of 'float' was received.")
@@ -1242,12 +1237,12 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return a + b
with self.assertRaises(typehints.TypeCheckError) as e:
- (self.p | beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0))
+ (self.p | beam.Create([1, 2, 3, 4]) | 'Add 1' >> beam.Map(add, 1.0))
self.p.run()
self.assertStartswith(
e.exception.message,
- "Runtime type violation detected within ParDo(add 1): "
+ "Runtime type violation detected within ParDo(Add 1): "
"Type-hint for argument: 'b' violated. "
"Expected an instance of <type 'int'>, "
"instead found 1.0, an instance of <type 'float'>.")
@@ -1259,14 +1254,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create([1, 2, 3, 4])
- | ('add 1' >> beam.Map(lambda x, one: x + one, 1.0)
+ | ('Add 1' >> beam.Map(lambda x, one: x + one, 1.0)
.with_input_types(int, int)
.with_output_types(float)))
self.p.run()
self.assertStartswith(
e.exception.message,
- "Runtime type violation detected within ParDo(add 1): "
+ "Runtime type violation detected within ParDo(Add 1): "
"Type-hint for argument: 'one' violated. "
"Expected an instance of <type 'int'>, "
"instead found 1.0, an instance of <type 'float'>.")
@@ -1278,8 +1273,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return sum(ints)
d = (self.p
- | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
- | 'sum' >> beam.CombineGlobally(sum_ints))
+ | 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'Sum' >> beam.CombineGlobally(sum_ints))
self.assertEqual(int, d.element_type)
assert_that(d, equal_to([6]))
@@ -1293,8 +1288,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'm' >> beam.Create([1, 2, 3]).with_output_types(int)
- | 'add' >> beam.CombineGlobally(bad_combine))
+ | 'M' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'Add' >> beam.CombineGlobally(bad_combine))
self.assertEqual(
"All functions for a Combine PTransform must accept a "
@@ -1314,9 +1309,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return list(range(n+1))
d = (self.p
- | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
- | 'sum' >> beam.CombineGlobally(sum_ints)
- | 'range' >> beam.ParDo(range_from_zero))
+ | 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
+ | 'Sum' >> beam.CombineGlobally(sum_ints)
+ | 'Range' >> beam.ParDo(range_from_zero))
self.assertEqual(int, d.element_type)
assert_that(d, equal_to([0, 1, 2, 3, 4, 5, 6]))
@@ -1331,8 +1326,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
return reduce(operator.mul, ints, 1)
d = (self.p
- | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
- | 'mul' >> beam.CombineGlobally(iter_mul))
+ | 'K' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
+ | 'Mul' >> beam.CombineGlobally(iter_mul))
assert_that(d, equal_to([625]))
self.p.run()
@@ -1349,14 +1344,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
- | 'mul' >> beam.CombineGlobally(iter_mul))
+ | 'K' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
+ | 'Mul' >> beam.CombineGlobally(iter_mul))
self.p.run()
self.assertStartswith(
e.exception.message,
"Runtime type violation detected within "
- "ParDo(mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): "
+ "ParDo(Mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): "
"Tuple[TypeVariable[K], int] hint type-constraint violated. "
"The type of element #1 in the passed tuple is incorrect. "
"Expected an instance of type int, "
@@ -1381,7 +1376,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
d = (self.p
| beam.Create(range(5)).with_output_types(int)
- | ('sum' >> beam.CombineGlobally(lambda s: sum(s))
+ | ('Sum' >> beam.CombineGlobally(lambda s: sum(s))
.with_input_types(int).with_output_types(int)))
assert_that(d, equal_to([10]))
@@ -1391,10 +1386,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create(range(3)).with_output_types(int)
- | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+ | ('SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
.with_input_types(str).with_output_types(str)))
- self.assertEqual("Input type hint violation at sort join: "
+ self.assertEqual("Input type hint violation at SortJoin: "
"expected <type 'str'>, got <type 'int'>",
e.exception.message)
@@ -1405,14 +1400,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create(range(3)).with_output_types(int)
- | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+ | ('SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
.with_input_types(str).with_output_types(str)))
self.p.run()
self.assertStartswith(
e.exception.message,
"Runtime type violation detected within "
- "ParDo(sort join/KeyWithVoid): "
+ "ParDo(SortJoin/KeyWithVoid): "
"Type-hint for argument: 'v' violated. "
"Expected an instance of <type 'str'>, "
"instead found 0, an instance of <type 'int'>.")
@@ -1422,20 +1417,20 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'e' >> beam.Create(range(3)).with_output_types(int)
- | 'sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
- | 'f' >> beam.Map(lambda x: x + 1))
+ | 'E' >> beam.Create(range(3)).with_output_types(int)
+ | 'SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+ | 'F' >> beam.Map(lambda x: x + 1))
self.assertEqual(
'Pipeline type checking is enabled, '
'however no output type-hint was found for the PTransform '
- 'ParDo(sort join/CombinePerKey/Combine/ParDo(CombineValuesDoFn))',
+ 'ParDo(SortJoin/CombinePerKey/Combine/ParDo(CombineValuesDoFn))',
e.exception.message)
def test_mean_globally_pipeline_checking_satisfied(self):
d = (self.p
- | 'c' >> beam.Create(range(5)).with_output_types(int)
- | 'mean' >> combine.Mean.Globally())
+ | 'C' >> beam.Create(range(5)).with_output_types(int)
+ | 'Mean' >> combine.Mean.Globally())
self.assertTrue(d.element_type is float)
assert_that(d, equal_to([2.0]))
@@ -1444,8 +1439,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_mean_globally_pipeline_checking_violated(self):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'c' >> beam.Create(['test']).with_output_types(str)
- | 'mean' >> combine.Mean.Globally())
+ | 'C' >> beam.Create(['test']).with_output_types(str)
+ | 'Mean' >> combine.Mean.Globally())
self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
"requires Tuple[TypeVariable[K], "
@@ -1457,8 +1452,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'c' >> beam.Create(range(5)).with_output_types(int)
- | 'mean' >> combine.Mean.Globally())
+ | 'C' >> beam.Create(range(5)).with_output_types(int)
+ | 'Mean' >> combine.Mean.Globally())
self.assertTrue(d.element_type is float)
assert_that(d, equal_to([2.0]))
@@ -1470,8 +1465,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
- | 'mean' >> combine.Mean.Globally())
+ | 'C' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+ | 'Mean' >> combine.Mean.Globally())
self.p.run()
self.assertEqual("Runtime type violation detected for transform input "
"when executing ParDoFlatMap(Combine): Tuple[Any, "
@@ -1487,9 +1482,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_mean_per_key_pipeline_checking_satisfied(self):
d = (self.p
| beam.Create(range(5)).with_output_types(int)
- | ('even group' >> beam.Map(lambda x: (not x % 2, x))
+ | ('EvenGroup' >> beam.Map(lambda x: (not x % 2, x))
.with_output_types(typehints.KV[bool, int]))
- | 'even mean' >> combine.Mean.PerKey())
+ | 'EvenMean' >> combine.Mean.PerKey())
self.assertCompatible(typehints.KV[bool, float], d.element_type)
assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
@@ -1499,9 +1494,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create(map(str, range(5))).with_output_types(str)
- | ('upper pair' >> beam.Map(lambda x: (x.upper(), x))
+ | ('UpperPair' >> beam.Map(lambda x: (x.upper(), x))
.with_output_types(typehints.KV[str, str]))
- | 'even mean' >> combine.Mean.PerKey())
+ | 'EvenMean' >> combine.Mean.PerKey())
self.p.run()
self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
@@ -1515,9 +1510,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
d = (self.p
| beam.Create(range(5)).with_output_types(int)
- | ('odd group' >> beam.Map(lambda x: (bool(x % 2), x))
+ | ('OddGroup' >> beam.Map(lambda x: (bool(x % 2), x))
.with_output_types(typehints.KV[bool, int]))
- | 'odd mean' >> combine.Mean.PerKey())
+ | 'OddMean' >> combine.Mean.PerKey())
self.assertCompatible(typehints.KV[bool, float], d.element_type)
assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
@@ -1530,15 +1525,15 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create(range(5)).with_output_types(int)
- | ('odd group' >> beam.Map(lambda x: (x, str(bool(x % 2))))
+ | ('OddGroup' >> beam.Map(lambda x: (x, str(bool(x % 2))))
.with_output_types(typehints.KV[int, str]))
- | 'odd mean' >> combine.Mean.PerKey())
+ | 'OddMean' >> combine.Mean.PerKey())
self.p.run()
self.assertStartswith(
e.exception.message,
"Runtime type violation detected within "
- "ParDo(odd mean/CombinePerKey(MeanCombineFn)/"
+ "ParDo(OddMean/CombinePerKey(MeanCombineFn)/"
"Combine/ParDo(CombineValuesDoFn)): "
"Type-hint for argument: 'p_context' violated: "
"Tuple[TypeVariable[K], Iterable[Union[float, int, long]]]"
@@ -1553,8 +1548,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_globally_pipeline_type_checking_satisfied(self):
d = (self.p
- | 'p' >> beam.Create(range(5)).with_output_types(int)
- | 'count int' >> combine.Count.Globally())
+ | 'P' >> beam.Create(range(5)).with_output_types(int)
+ | 'CountInt' >> combine.Count.Globally())
self.assertTrue(d.element_type is int)
assert_that(d, equal_to([5]))
@@ -1564,8 +1559,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
- | 'p' >> beam.Create(range(5)).with_output_types(int)
- | 'count int' >> combine.Count.Globally())
+ | 'P' >> beam.Create(range(5)).with_output_types(int)
+ | 'CountInt' >> combine.Count.Globally())
self.assertTrue(d.element_type is int)
assert_that(d, equal_to([5]))
@@ -1574,9 +1569,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_perkey_pipeline_type_checking_satisfied(self):
d = (self.p
| beam.Create(range(5)).with_output_types(int)
- | ('even group' >> beam.Map(lambda x: (not x % 2, x))
+ | ('EvenGroup' >> beam.Map(lambda x: (not x % 2, x))
.with_output_types(typehints.KV[bool, int]))
- | 'count int' >> combine.Count.PerKey())
+ | 'CountInt' >> combine.Count.PerKey())
self.assertCompatible(typehints.KV[bool, int], d.element_type)
assert_that(d, equal_to([(False, 2), (True, 3)]))
@@ -1586,7 +1581,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create(range(5)).with_output_types(int)
- | 'count int' >> combine.Count.PerKey())
+ | 'CountInt' >> combine.Count.PerKey())
self.assertEqual("Input type hint violation at GroupByKey: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1598,9 +1593,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
d = (self.p
| beam.Create(['t', 'e', 's', 't']).with_output_types(str)
- | 'dup key' >> beam.Map(lambda x: (x, x))
+ | 'DupKey' >> beam.Map(lambda x: (x, x))
.with_output_types(typehints.KV[str, str])
- | 'count dups' >> combine.Count.PerKey())
+ | 'CountDups' >> combine.Count.PerKey())
self.assertCompatible(typehints.KV[str, int], d.element_type)
assert_that(d, equal_to([('e', 1), ('s', 1), ('t', 2)]))
@@ -1609,7 +1604,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_count_perelement_pipeline_type_checking_satisfied(self):
d = (self.p
| beam.Create([1, 1, 2, 3]).with_output_types(int)
- | 'count elems' >> combine.Count.PerElement())
+ | 'CountElems' >> combine.Count.PerElement())
self.assertCompatible(typehints.KV[int, int], d.element_type)
assert_that(d, equal_to([(1, 2), (2, 1), (3, 1)]))
@@ -1621,7 +1616,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| 'f' >> beam.Create([1, 1, 2, 3])
- | 'count elems' >> combine.Count.PerElement())
+ | 'CountElems' >> combine.Count.PerElement())
self.assertEqual('Pipeline type checking is enabled, however no output '
'type-hint was found for the PTransform '
@@ -1634,7 +1629,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
d = (self.p
| beam.Create([True, True, False, True, True])
.with_output_types(bool)
- | 'count elems' >> combine.Count.PerElement())
+ | 'CountElems' >> combine.Count.PerElement())
self.assertCompatible(typehints.KV[bool, int], d.element_type)
assert_that(d, equal_to([(False, 1), (True, 4)]))
@@ -1643,7 +1638,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_top_of_pipeline_checking_satisfied(self):
d = (self.p
| beam.Create(range(5, 11)).with_output_types(int)
- | 'top 3' >> combine.Top.Of(3, lambda x, y: x < y))
+ | 'Top 3' >> combine.Top.Of(3, lambda x, y: x < y))
self.assertCompatible(typehints.Iterable[int],
d.element_type)
@@ -1655,7 +1650,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
d = (self.p
| beam.Create(list('testing')).with_output_types(str)
- | 'acii top' >> combine.Top.Of(3, lambda x, y: x < y))
+ | 'AciiTop' >> combine.Top.Of(3, lambda x, y: x < y))
self.assertCompatible(typehints.Iterable[str], d.element_type)
assert_that(d, equal_to([['t', 't', 's']]))
@@ -1665,8 +1660,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create(range(100)).with_output_types(int)
- | 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
- | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
+ | 'Num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
+ | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b))
self.assertEqual("Input type hint violation at GroupByKey: "
"expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1676,9 +1671,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_per_key_pipeline_checking_satisfied(self):
d = (self.p
| beam.Create(range(100)).with_output_types(int)
- | ('group mod 3' >> beam.Map(lambda x: (x % 3, x))
+ | ('GroupMod 3' >> beam.Map(lambda x: (x % 3, x))
.with_output_types(typehints.KV[int, int]))
- | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
+ | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b))
self.assertCompatible(typehints.Tuple[int, typehints.Iterable[int]],
d.element_type)
@@ -1690,9 +1685,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
d = (self.p
| beam.Create(range(21))
- | ('group mod 3' >> beam.Map(lambda x: (x % 3, x))
+ | ('GroupMod 3' >> beam.Map(lambda x: (x % 3, x))
.with_output_types(typehints.KV[int, int]))
- | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
+ | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b))
self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
d.element_type)
@@ -1702,7 +1697,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_sample_globally_pipeline_satisfied(self):
d = (self.p
| beam.Create([2, 2, 3, 3]).with_output_types(int)
- | 'sample' >> combine.Sample.FixedSizeGlobally(3))
+ | 'Sample' >> combine.Sample.FixedSizeGlobally(3))
self.assertCompatible(typehints.Iterable[int], d.element_type)
@@ -1718,7 +1713,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
d = (self.p
| beam.Create([2, 2, 3, 3]).with_output_types(int)
- | 'sample' >> combine.Sample.FixedSizeGlobally(2))
+ | 'Sample' >> combine.Sample.FixedSizeGlobally(2))
self.assertCompatible(typehints.Iterable[int], d.element_type)
@@ -1733,7 +1728,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
d = (self.p
| (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
.with_output_types(typehints.KV[int, int]))
- | 'sample' >> combine.Sample.FixedSizePerKey(2))
+ | 'Sample' >> combine.Sample.FixedSizePerKey(2))
self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
d.element_type)
@@ -1752,7 +1747,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
d = (self.p
| (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
.with_output_types(typehints.KV[int, int]))
- | 'sample' >> combine.Sample.FixedSizePerKey(1))
+ | 'Sample' >> combine.Sample.FixedSizePerKey(1))
self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
d.element_type)
@@ -1835,13 +1830,13 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(TypeError) as e:
(self.p
| beam.Create([1, 2, 3]).with_output_types(int)
- | 'len' >> beam.Map(lambda x: len(x)).with_output_types(int))
+ | 'Len' >> beam.Map(lambda x: len(x)).with_output_types(int))
self.p.run()
# Our special type-checking related TypeError shouldn't have been raised.
# Instead the above pipeline should have triggered a regular Python runtime
# TypeError.
- self.assertEqual("object of type 'int' has no len() [while running 'len']",
+ self.assertEqual("object of type 'int' has no len() [while running 'Len']",
e.exception.message)
self.assertFalse(isinstance(e, typehints.TypeCheckError))
@@ -1869,7 +1864,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
_ = (self.p
| beam.Create(['a', 'b', 'c'])
- | 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0))
+ | 'Ungroupable' >> beam.Map(lambda x: (x, 0, 1.0))
| beam.GroupByKey())
self.assertEqual('Input type hint violation at GroupByKey: '
@@ -1879,11 +1874,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
def test_type_inference_command_line_flag_toggle(self):
self.p.options.view_as(TypeOptions).pipeline_type_check = False
- x = self.p | 'c1' >> beam.Create([1, 2, 3, 4])
+ x = self.p | 'C1' >> beam.Create([1, 2, 3, 4])
self.assertIsNone(x.element_type)
self.p.options.view_as(TypeOptions).pipeline_type_check = True
- x = self.p | 'c2' >> beam.Create([1, 2, 3, 4])
+ x = self.p | 'C2' >> beam.Create([1, 2, 3, 4])
self.assertEqual(int, x.element_type)