You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/08 16:48:22 UTC

[beam] branch master updated: [BEAM-6781] Changed -with- statement to run job in every load test.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c0893c  [BEAM-6781] Changed -with- statement to run job in every load test.
     new 7f7dd70  Merge pull request #8019 from kkucharc/BEAM-6781-with-statement
9c0893c is described below

commit 9c0893cda58b8e2141c752ac03e72fa12ec03b51
Author: Kasia Kucharczyk <ka...@polidea.com>
AuthorDate: Fri Mar 8 15:52:25 2019 +0100

    [BEAM-6781] Changed -with- statement to run job in every load test.
---
 .../testing/load_tests/co_group_by_key_test.py     | 59 +++++++++++-----------
 .../apache_beam/testing/load_tests/combine_test.py | 33 ++++++------
 .../testing/load_tests/group_by_key_test.py        | 33 ++++++------
 .../apache_beam/testing/load_tests/pardo_test.py   | 55 ++++++++++----------
 .../testing/load_tests/sideinput_test.py           | 59 +++++++++++-----------
 5 files changed, 117 insertions(+), 122 deletions(-)

diff --git a/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py b/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
index 3999844..6a86ccc 100644
--- a/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
@@ -210,36 +210,35 @@ class CoGroupByKeyTest(unittest.TestCase):
         yield i
 
   def testCoGroupByKey(self):
-    with self.pipeline as p:
-      pc1 = (p
-             | 'Read ' + INPUT_TAG >> beam.io.Read(
-                 synthetic_pipeline.SyntheticSource(
-                     self.parseTestPipelineOptions(self.input_options)))
-             | 'Make ' + INPUT_TAG + ' iterable' >> beam.Map(lambda x: (x, x))
-             | 'Measure time: Start pc1' >> beam.ParDo(
-                 MeasureTime(self.metrics_namespace))
-            )
-
-      pc2 = (p
-             | 'Read ' + CO_INPUT_TAG >> beam.io.Read(
-                 synthetic_pipeline.SyntheticSource(
-                     self.parseTestPipelineOptions(self.co_input_options)))
-             | 'Make ' + CO_INPUT_TAG + ' iterable' >> beam.Map(
-                 lambda x: (x, x))
-             | 'Measure time: Start pc2' >> beam.ParDo(
-                 MeasureTime(self.metrics_namespace))
-            )
-      # pylint: disable=expression-not-assigned
-      ({INPUT_TAG: pc1, CO_INPUT_TAG: pc2}
-       | 'CoGroupByKey: ' >> beam.CoGroupByKey()
-       | 'Consume Joined Collections' >> beam.ParDo(self._Ungroup())
-       | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
-      )
-
-      result = p.run()
-      result.wait_until_finish()
-      if self.metrics_monitor is not None:
-        self.metrics_monitor.send_metrics(result)
+    pc1 = (self.pipeline
+           | 'Read ' + INPUT_TAG >> beam.io.Read(
+               synthetic_pipeline.SyntheticSource(
+                   self.parseTestPipelineOptions(self.input_options)))
+           | 'Make ' + INPUT_TAG + ' iterable' >> beam.Map(lambda x: (x, x))
+           | 'Measure time: Start pc1' >> beam.ParDo(
+               MeasureTime(self.metrics_namespace))
+          )
+
+    pc2 = (self.pipeline
+           | 'Read ' + CO_INPUT_TAG >> beam.io.Read(
+               synthetic_pipeline.SyntheticSource(
+                   self.parseTestPipelineOptions(self.co_input_options)))
+           | 'Make ' + CO_INPUT_TAG + ' iterable' >> beam.Map(
+               lambda x: (x, x))
+           | 'Measure time: Start pc2' >> beam.ParDo(
+               MeasureTime(self.metrics_namespace))
+          )
+    # pylint: disable=expression-not-assigned
+    ({INPUT_TAG: pc1, CO_INPUT_TAG: pc2}
+     | 'CoGroupByKey: ' >> beam.CoGroupByKey()
+     | 'Consume Joined Collections' >> beam.ParDo(self._Ungroup())
+     | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+    )
+
+    result = self.pipeline.run()
+    result.wait_until_finish()
+    if self.metrics_monitor is not None:
+      self.metrics_monitor.send_metrics(result)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/testing/load_tests/combine_test.py b/sdks/python/apache_beam/testing/load_tests/combine_test.py
index e54acca..a5b18a1 100644
--- a/sdks/python/apache_beam/testing/load_tests/combine_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/combine_test.py
@@ -170,23 +170,22 @@ class CombineTest(unittest.TestCase):
       yield element
 
   def testCombineGlobally(self):
-    with self.pipeline as p:
-      # pylint: disable=expression-not-assigned
-      (p
-       | beam.io.Read(synthetic_pipeline.SyntheticSource(
-           self.parseTestPipelineOptions()))
-       | 'Measure time: Start' >> beam.ParDo(
-           MeasureTime(self.metrics_namespace))
-       | 'Combine with Top' >> beam.CombineGlobally(
-           beam.combiners.TopCombineFn(1000))
-       | 'Consume' >> beam.ParDo(self._GetElement())
-       | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
-      )
-
-      result = p.run()
-      result.wait_until_finish()
-      if self.metrics_monitor is not None:
-        self.metrics_monitor.send_metrics(result)
+    # pylint: disable=expression-not-assigned
+    (self.pipeline
+     | beam.io.Read(synthetic_pipeline.SyntheticSource(
+         self.parseTestPipelineOptions()))
+     | 'Measure time: Start' >> beam.ParDo(
+         MeasureTime(self.metrics_namespace))
+     | 'Combine with Top' >> beam.CombineGlobally(
+         beam.combiners.TopCombineFn(1000))
+     | 'Consume' >> beam.ParDo(self._GetElement())
+     | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+    )
+
+    result = self.pipeline.run()
+    result.wait_until_finish()
+    if self.metrics_monitor is not None:
+      self.metrics_monitor.send_metrics(result)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py b/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py
index fa8c765..588c8b1 100644
--- a/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py
@@ -167,23 +167,22 @@ class GroupByKeyTest(unittest.TestCase):
                        'are empty.')
 
   def testGroupByKey(self):
-    with self.pipeline as p:
-      # pylint: disable=expression-not-assigned
-      (p
-       | beam.io.Read(synthetic_pipeline.SyntheticSource(
-           self.parseTestPipelineOptions()))
-       | 'Measure time: Start' >> beam.ParDo(
-           MeasureTime(self.metrics_namespace))
-       | 'GroupByKey' >> beam.GroupByKey()
-       | 'Ungroup' >> beam.FlatMap(
-           lambda elm: [(elm[0], v) for v in elm[1]])
-       | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
-      )
-
-      result = p.run()
-      result.wait_until_finish()
-      if self.metrics_monitor is not None:
-        self.metrics_monitor.send_metrics(result)
+    # pylint: disable=expression-not-assigned
+    (self.pipeline
+     | beam.io.Read(synthetic_pipeline.SyntheticSource(
+         self.parseTestPipelineOptions()))
+     | 'Measure time: Start' >> beam.ParDo(
+         MeasureTime(self.metrics_namespace))
+     | 'GroupByKey' >> beam.GroupByKey()
+     | 'Ungroup' >> beam.FlatMap(
+         lambda elm: [(elm[0], v) for v in elm[1]])
+     | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+    )
+
+    result = self.pipeline.run()
+    result.wait_until_finish()
+    if self.metrics_monitor is not None:
+      self.metrics_monitor.send_metrics(result)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/testing/load_tests/pardo_test.py b/sdks/python/apache_beam/testing/load_tests/pardo_test.py
index e2f879e..52ae842 100644
--- a/sdks/python/apache_beam/testing/load_tests/pardo_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/pardo_test.py
@@ -191,38 +191,37 @@ class ParDoTest(unittest.TestCase):
     else:
       num_runs = int(self.iterations)
 
-    with self.pipeline as p:
-      pc = (p
-            | 'Read synthetic' >> beam.io.Read(
-                synthetic_pipeline.SyntheticSource(
-                    self.parseTestPipelineOptions()
-                ))
-            | 'Measure time: Start' >> beam.ParDo(
-                MeasureTime(self.metrics_namespace))
+    pc = (self.pipeline
+          | 'Read synthetic' >> beam.io.Read(
+              synthetic_pipeline.SyntheticSource(
+                  self.parseTestPipelineOptions()
+              ))
+          | 'Measure time: Start' >> beam.ParDo(
+              MeasureTime(self.metrics_namespace))
+         )
+
+    for i in range(num_runs):
+      is_returning = (i == (num_runs-1))
+      pc = (pc
+            | 'Step: %d' % i >> beam.ParDo(
+                _GetElement(), self.metrics_namespace, is_returning)
            )
 
-      for i in range(num_runs):
-        is_returning = (i == (num_runs-1))
-        pc = (pc
-              | 'Step: %d' % i >> beam.ParDo(
-                  _GetElement(), self.metrics_namespace, is_returning)
-             )
-
-      if self.output is not None:
-        pc = (pc
-              | "Write" >> beam.io.WriteToText(self.output)
-             )
-
-      # pylint: disable=expression-not-assigned
-      (pc
-       | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
-      )
+    if self.output is not None:
+      pc = (pc
+            | "Write" >> beam.io.WriteToText(self.output)
+           )
+
+    # pylint: disable=expression-not-assigned
+    (pc
+     | 'Measure time: End' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+    )
 
-      result = p.run()
-      result.wait_until_finish()
+    result = self.pipeline.run()
+    result.wait_until_finish()
 
-      if self.metrics_monitor is not None:
-        self.metrics_monitor.send_metrics(result)
+    if self.metrics_monitor is not None:
+      self.metrics_monitor.send_metrics(result)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/testing/load_tests/sideinput_test.py b/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
index 0ad355e..ba5d3b2 100644
--- a/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
@@ -201,36 +201,35 @@ class SideInputTest(unittest.TestCase):
             list.append({key: element[1]+value})
       yield list
 
-    with self.pipeline as p:
-      main_input = (p
-                    | "Read pcoll 1" >> beam.io.Read(
-                        synthetic_pipeline.SyntheticSource(
-                            self._parseTestPipelineOptions()))
-                    | 'Measure time: Start pcoll 1' >> beam.ParDo(
-                        MeasureTime(self.metrics_namespace))
-                   )
-
-      side_input = (p
-                    | "Read pcoll 2" >> beam.io.Read(
-                        synthetic_pipeline.SyntheticSource(
-                            self._getSideInput()))
-                    | 'Measure time: Start pcoll 2' >> beam.ParDo(
-                        MeasureTime(self.metrics_namespace))
-                   )
-      # pylint: disable=expression-not-assigned
-      (main_input
-       | "Merge" >> beam.ParDo(
-           join_fn,
-           AsIter(side_input),
-           self.iterations)
-       | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
-      )
-
-      result = p.run()
-      result.wait_until_finish()
-
-      if self.metrics_monitor is not None:
-        self.metrics_monitor.send_metrics(result)
+    main_input = (self.pipeline
+                  | "Read pcoll 1" >> beam.io.Read(
+                      synthetic_pipeline.SyntheticSource(
+                          self._parseTestPipelineOptions()))
+                  | 'Measure time: Start pcoll 1' >> beam.ParDo(
+                      MeasureTime(self.metrics_namespace))
+                 )
+
+    side_input = (self.pipeline
+                  | "Read pcoll 2" >> beam.io.Read(
+                      synthetic_pipeline.SyntheticSource(
+                          self._getSideInput()))
+                  | 'Measure time: Start pcoll 2' >> beam.ParDo(
+                      MeasureTime(self.metrics_namespace))
+                 )
+    # pylint: disable=expression-not-assigned
+    (main_input
+     | "Merge" >> beam.ParDo(
+         join_fn,
+         AsIter(side_input),
+         self.iterations)
+     | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+    )
+
+    result = self.pipeline.run()
+    result.wait_until_finish()
+
+    if self.metrics_monitor is not None:
+      self.metrics_monitor.send_metrics(result)
 
   if __name__ == '__main__':
     logging.getLogger().setLevel(logging.DEBUG)