You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/08 16:48:22 UTC
[beam] branch master updated: [BEAM-6781] Changed -with- statement
to run job in every load test.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9c0893c [BEAM-6781] Changed -with- statement to run job in every load test.
new 7f7dd70 Merge pull request #8019 from kkucharc/BEAM-6781-with-statement
9c0893c is described below
commit 9c0893cda58b8e2141c752ac03e72fa12ec03b51
Author: Kasia Kucharczyk <ka...@polidea.com>
AuthorDate: Fri Mar 8 15:52:25 2019 +0100
[BEAM-6781] Changed -with- statement to run job in every load test.
---
.../testing/load_tests/co_group_by_key_test.py | 59 +++++++++++-----------
.../apache_beam/testing/load_tests/combine_test.py | 33 ++++++------
.../testing/load_tests/group_by_key_test.py | 33 ++++++------
.../apache_beam/testing/load_tests/pardo_test.py | 55 ++++++++++----------
.../testing/load_tests/sideinput_test.py | 59 +++++++++++-----------
5 files changed, 117 insertions(+), 122 deletions(-)
diff --git a/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py b/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
index 3999844..6a86ccc 100644
--- a/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
@@ -210,36 +210,35 @@ class CoGroupByKeyTest(unittest.TestCase):
yield i
def testCoGroupByKey(self):
- with self.pipeline as p:
- pc1 = (p
- | 'Read ' + INPUT_TAG >> beam.io.Read(
- synthetic_pipeline.SyntheticSource(
- self.parseTestPipelineOptions(self.input_options)))
- | 'Make ' + INPUT_TAG + ' iterable' >> beam.Map(lambda x: (x, x))
- | 'Measure time: Start pc1' >> beam.ParDo(
- MeasureTime(self.metrics_namespace))
- )
-
- pc2 = (p
- | 'Read ' + CO_INPUT_TAG >> beam.io.Read(
- synthetic_pipeline.SyntheticSource(
- self.parseTestPipelineOptions(self.co_input_options)))
- | 'Make ' + CO_INPUT_TAG + ' iterable' >> beam.Map(
- lambda x: (x, x))
- | 'Measure time: Start pc2' >> beam.ParDo(
- MeasureTime(self.metrics_namespace))
- )
- # pylint: disable=expression-not-assigned
- ({INPUT_TAG: pc1, CO_INPUT_TAG: pc2}
- | 'CoGroupByKey: ' >> beam.CoGroupByKey()
- | 'Consume Joined Collections' >> beam.ParDo(self._Ungroup())
- | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
- )
-
- result = p.run()
- result.wait_until_finish()
- if self.metrics_monitor is not None:
- self.metrics_monitor.send_metrics(result)
+ pc1 = (self.pipeline
+ | 'Read ' + INPUT_TAG >> beam.io.Read(
+ synthetic_pipeline.SyntheticSource(
+ self.parseTestPipelineOptions(self.input_options)))
+ | 'Make ' + INPUT_TAG + ' iterable' >> beam.Map(lambda x: (x, x))
+ | 'Measure time: Start pc1' >> beam.ParDo(
+ MeasureTime(self.metrics_namespace))
+ )
+
+ pc2 = (self.pipeline
+ | 'Read ' + CO_INPUT_TAG >> beam.io.Read(
+ synthetic_pipeline.SyntheticSource(
+ self.parseTestPipelineOptions(self.co_input_options)))
+ | 'Make ' + CO_INPUT_TAG + ' iterable' >> beam.Map(
+ lambda x: (x, x))
+ | 'Measure time: Start pc2' >> beam.ParDo(
+ MeasureTime(self.metrics_namespace))
+ )
+ # pylint: disable=expression-not-assigned
+ ({INPUT_TAG: pc1, CO_INPUT_TAG: pc2}
+ | 'CoGroupByKey: ' >> beam.CoGroupByKey()
+ | 'Consume Joined Collections' >> beam.ParDo(self._Ungroup())
+ | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+ )
+
+ result = self.pipeline.run()
+ result.wait_until_finish()
+ if self.metrics_monitor is not None:
+ self.metrics_monitor.send_metrics(result)
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/testing/load_tests/combine_test.py b/sdks/python/apache_beam/testing/load_tests/combine_test.py
index e54acca..a5b18a1 100644
--- a/sdks/python/apache_beam/testing/load_tests/combine_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/combine_test.py
@@ -170,23 +170,22 @@ class CombineTest(unittest.TestCase):
yield element
def testCombineGlobally(self):
- with self.pipeline as p:
- # pylint: disable=expression-not-assigned
- (p
- | beam.io.Read(synthetic_pipeline.SyntheticSource(
- self.parseTestPipelineOptions()))
- | 'Measure time: Start' >> beam.ParDo(
- MeasureTime(self.metrics_namespace))
- | 'Combine with Top' >> beam.CombineGlobally(
- beam.combiners.TopCombineFn(1000))
- | 'Consume' >> beam.ParDo(self._GetElement())
- | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
- )
-
- result = p.run()
- result.wait_until_finish()
- if self.metrics_monitor is not None:
- self.metrics_monitor.send_metrics(result)
+ # pylint: disable=expression-not-assigned
+ (self.pipeline
+ | beam.io.Read(synthetic_pipeline.SyntheticSource(
+ self.parseTestPipelineOptions()))
+ | 'Measure time: Start' >> beam.ParDo(
+ MeasureTime(self.metrics_namespace))
+ | 'Combine with Top' >> beam.CombineGlobally(
+ beam.combiners.TopCombineFn(1000))
+ | 'Consume' >> beam.ParDo(self._GetElement())
+ | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+ )
+
+ result = self.pipeline.run()
+ result.wait_until_finish()
+ if self.metrics_monitor is not None:
+ self.metrics_monitor.send_metrics(result)
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py b/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py
index fa8c765..588c8b1 100644
--- a/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py
@@ -167,23 +167,22 @@ class GroupByKeyTest(unittest.TestCase):
'are empty.')
def testGroupByKey(self):
- with self.pipeline as p:
- # pylint: disable=expression-not-assigned
- (p
- | beam.io.Read(synthetic_pipeline.SyntheticSource(
- self.parseTestPipelineOptions()))
- | 'Measure time: Start' >> beam.ParDo(
- MeasureTime(self.metrics_namespace))
- | 'GroupByKey' >> beam.GroupByKey()
- | 'Ungroup' >> beam.FlatMap(
- lambda elm: [(elm[0], v) for v in elm[1]])
- | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
- )
-
- result = p.run()
- result.wait_until_finish()
- if self.metrics_monitor is not None:
- self.metrics_monitor.send_metrics(result)
+ # pylint: disable=expression-not-assigned
+ (self.pipeline
+ | beam.io.Read(synthetic_pipeline.SyntheticSource(
+ self.parseTestPipelineOptions()))
+ | 'Measure time: Start' >> beam.ParDo(
+ MeasureTime(self.metrics_namespace))
+ | 'GroupByKey' >> beam.GroupByKey()
+ | 'Ungroup' >> beam.FlatMap(
+ lambda elm: [(elm[0], v) for v in elm[1]])
+ | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+ )
+
+ result = self.pipeline.run()
+ result.wait_until_finish()
+ if self.metrics_monitor is not None:
+ self.metrics_monitor.send_metrics(result)
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/testing/load_tests/pardo_test.py b/sdks/python/apache_beam/testing/load_tests/pardo_test.py
index e2f879e..52ae842 100644
--- a/sdks/python/apache_beam/testing/load_tests/pardo_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/pardo_test.py
@@ -191,38 +191,37 @@ class ParDoTest(unittest.TestCase):
else:
num_runs = int(self.iterations)
- with self.pipeline as p:
- pc = (p
- | 'Read synthetic' >> beam.io.Read(
- synthetic_pipeline.SyntheticSource(
- self.parseTestPipelineOptions()
- ))
- | 'Measure time: Start' >> beam.ParDo(
- MeasureTime(self.metrics_namespace))
+ pc = (self.pipeline
+ | 'Read synthetic' >> beam.io.Read(
+ synthetic_pipeline.SyntheticSource(
+ self.parseTestPipelineOptions()
+ ))
+ | 'Measure time: Start' >> beam.ParDo(
+ MeasureTime(self.metrics_namespace))
+ )
+
+ for i in range(num_runs):
+ is_returning = (i == (num_runs-1))
+ pc = (pc
+ | 'Step: %d' % i >> beam.ParDo(
+ _GetElement(), self.metrics_namespace, is_returning)
)
- for i in range(num_runs):
- is_returning = (i == (num_runs-1))
- pc = (pc
- | 'Step: %d' % i >> beam.ParDo(
- _GetElement(), self.metrics_namespace, is_returning)
- )
-
- if self.output is not None:
- pc = (pc
- | "Write" >> beam.io.WriteToText(self.output)
- )
-
- # pylint: disable=expression-not-assigned
- (pc
- | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
- )
+ if self.output is not None:
+ pc = (pc
+ | "Write" >> beam.io.WriteToText(self.output)
+ )
+
+ # pylint: disable=expression-not-assigned
+ (pc
+ | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+ )
- result = p.run()
- result.wait_until_finish()
+ result = self.pipeline.run()
+ result.wait_until_finish()
- if self.metrics_monitor is not None:
- self.metrics_monitor.send_metrics(result)
+ if self.metrics_monitor is not None:
+ self.metrics_monitor.send_metrics(result)
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/testing/load_tests/sideinput_test.py b/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
index 0ad355e..ba5d3b2 100644
--- a/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
@@ -201,36 +201,35 @@ class SideInputTest(unittest.TestCase):
list.append({key: element[1]+value})
yield list
- with self.pipeline as p:
- main_input = (p
- | "Read pcoll 1" >> beam.io.Read(
- synthetic_pipeline.SyntheticSource(
- self._parseTestPipelineOptions()))
- | 'Measure time: Start pcoll 1' >> beam.ParDo(
- MeasureTime(self.metrics_namespace))
- )
-
- side_input = (p
- | "Read pcoll 2" >> beam.io.Read(
- synthetic_pipeline.SyntheticSource(
- self._getSideInput()))
- | 'Measure time: Start pcoll 2' >> beam.ParDo(
- MeasureTime(self.metrics_namespace))
- )
- # pylint: disable=expression-not-assigned
- (main_input
- | "Merge" >> beam.ParDo(
- join_fn,
- AsIter(side_input),
- self.iterations)
- | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
- )
-
- result = p.run()
- result.wait_until_finish()
-
- if self.metrics_monitor is not None:
- self.metrics_monitor.send_metrics(result)
+ main_input = (self.pipeline
+ | "Read pcoll 1" >> beam.io.Read(
+ synthetic_pipeline.SyntheticSource(
+ self._parseTestPipelineOptions()))
+ | 'Measure time: Start pcoll 1' >> beam.ParDo(
+ MeasureTime(self.metrics_namespace))
+ )
+
+ side_input = (self.pipeline
+ | "Read pcoll 2" >> beam.io.Read(
+ synthetic_pipeline.SyntheticSource(
+ self._getSideInput()))
+ | 'Measure time: Start pcoll 2' >> beam.ParDo(
+ MeasureTime(self.metrics_namespace))
+ )
+ # pylint: disable=expression-not-assigned
+ (main_input
+ | "Merge" >> beam.ParDo(
+ join_fn,
+ AsIter(side_input),
+ self.iterations)
+ | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+ )
+
+ result = self.pipeline.run()
+ result.wait_until_finish()
+
+ if self.metrics_monitor is not None:
+ self.metrics_monitor.send_metrics(result)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)