You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ka...@apache.org on 2020/03/26 10:33:37 UTC

[beam] branch master updated: [BEAM-7505] Add side input load test to Python SDK (#11136)

This is an automated email from the ASF dual-hosted git repository.

kamilwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 789d2ee  [BEAM-7505] Add side input load test to Python SDK  (#11136)
789d2ee is described below

commit 789d2ee3791f711d8f2b681ed85d261671d9476f
Author: Kamil Wasilewski <ka...@polidea.com>
AuthorDate: Thu Mar 26 11:33:16 2020 +0100

    [BEAM-7505] Add side input load test to Python SDK  (#11136)
---
 .../testing/load_tests/sideinput_test.py           | 195 +++++++++++++++------
 1 file changed, 145 insertions(+), 50 deletions(-)

diff --git a/sdks/python/apache_beam/testing/load_tests/sideinput_test.py b/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
index ead3cc4..987c1ad 100644
--- a/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
@@ -16,28 +16,30 @@
 #
 
 """
-This is SideInput load test with Synthetic Source. Besides of the standard
-input options there are additional options:
-* number_of_counter_operations - number of pardo operations
-* project (optional) - the gcp project in case of saving
-metrics in Big Query (in case of Dataflow Runner
-it is required to specify project of runner),
-* publish_to_big_query - if metrics should be published in big query,
-* metrics_namespace (optional) - name of BigQuery dataset where metrics
-will be stored,
-* metrics_table (optional) - name of BigQuery table where metrics
-will be stored,
-* input_options - options for Synthetic Sources.
+Load test for operations involving side inputs.
+
+The purpose of this test is to measure the cost of materialization and
+accessing side inputs. The test uses synthetic source which can be
+parametrized to generate records with various sizes of keys and values,
+impose delays in the pipeline and simulate other performance challenges.
+
+This test can accept the following parameters:
+  * side_input_type (str) - Required. Specifies how the side input will be
+    materialized in ParDo operation. Choose from (dict, iter, list).
+  * window_count (int) - The number of fixed sized windows to subdivide the
+    side input into. By default, no windows will be used.
+  * side_input_size (int) - The size of the side input. Must be equal to or
+    lower than the size of the main input. If lower, the side input will be
+    created by applying a :class:`beam.combiners.Sample
+    <apache_beam.transforms.combiners.Sample>` transform.
+  * access_percentage (int) - Specifies the percentage of elements in the side
+    input to be accessed. By default, all elements will be accessed.
 
 Example test run:
 
 python -m apache_beam.testing.load_tests.sideinput_test \
     --test-pipeline-options="
-    --project=big-query-project
-    --publish_to_big_query=true
-    --metrics_dataset=python_load_tests
-    --metrics_table=side_input
-    --number_of_counter_operations=1000
+    --side_input_type=iter
     --input_options='{
     \"num_records\": 300,
     \"key_size\": 5,
@@ -47,15 +49,11 @@ python -m apache_beam.testing.load_tests.sideinput_test \
 or:
 
 ./gradlew -PloadTest.args="
-    --publish_to_big_query=true
-    --project=...
-    --metrics_dataset=python_load_tests
-    --metrics_table=side_input
+    --side_input_type=iter
     --input_options='{
-      \"num_records\": 1,
-      \"key_size\": 1,
-      \"value_size\": 1}'
-    --runner=DirectRunner" \
+      \"num_records\": 300,
+      \"key_size\": 5,
+      \"value_size\": 15}'" \
 -PloadTest.mainClass=apache_beam.testing.load_tests.sideinput_test \
 -Prunner=DirectRunner :sdks:python:apache_beam:testing:load_tests:run
 """
@@ -63,49 +61,146 @@ or:
 # pytype: skip-file
 
 from __future__ import absolute_import
+from __future__ import division
 
 import logging
 
 import apache_beam as beam
-from apache_beam.pvalue import AsIter
 from apache_beam.testing.load_tests.load_test import LoadTest
 from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
 from apache_beam.testing.synthetic_pipeline import SyntheticSource
 
 
 class SideInputTest(LoadTest):
+  SIDE_INPUT_TYPES = {
+      'iter': beam.pvalue.AsIter,
+      'list': beam.pvalue.AsList,
+      'dict': beam.pvalue.AsDict,
+  }
+
   def __init__(self):
     super(SideInputTest, self).__init__()
-    self.iterations = self.get_option_or_default(
-        'number_of_counter_operations', 1)
+    self.windows = self.get_option_or_default('window_count', default=0)
+    self.access_percentage = self.get_option_or_default(
+        'access_percentage', default=100)
+    if self.access_percentage < 0 or self.access_percentage > 100:
+      raise ValueError(
+          'access_percentage: Invalid value. Should be in range '
+          'from 0 to 100, got {} instead'.format(self.access_percentage))
+
+    self.side_input_size = self.get_option_or_default(
+        'side_input_size', default=0)
+    if self.side_input_size == 0:
+      self.side_input_size = self.input_options.get('num_records')
+
+    self.side_input_type = self.pipeline.get_option('side_input_type')
+    if self.side_input_type is None:
+      raise ValueError(
+          'side_input_type is required. Please provide one of '
+          'these: {}'.format(list(self.SIDE_INPUT_TYPES.keys())))
+
+  def materialize_as(self):
+    try:
+      return self.SIDE_INPUT_TYPES[self.side_input_type]
+    except KeyError:
+      raise ValueError(
+          'Unknown side input type. Please provide one of '
+          'these: {}'.format(list(self.SIDE_INPUT_TYPES.keys())))
 
   def test(self):
-    def join_fn(element, side_input, iterations):
-      result = []
-      for i in range(iterations):
-        for key, value in side_input:
-          if i == iterations - 1:
-            result.append({key: element[1] + value})
-      yield result
-
-    main_input = (
+    class SequenceSideInputTestDoFn(beam.DoFn):
+      """Iterate over first n side_input elements."""
+      def __init__(self, first_n):
+        self._first_n = first_n
+
+      def process(self, unused_element, side_input):
+        i = 0
+        it = iter(side_input)
+        while i < self._first_n:
+          i += 1
+          try:
+            # No-op. We only make sure that the element is accessed.
+            next(it)
+          except StopIteration:
+            return
+
+    class MappingSideInputTestDoFn(beam.DoFn):
+      """Take a sequence of keys as an additional side input and for each
+      key in the sequence checks the value for key in the dictionary."""
+      def process(self, unused_element, dict_side_input, keys_to_check):
+        for key in keys_to_check:
+          # No-op. We only make sure that the element is accessed.
+          dict_side_input[key]
+
+    class GetRandomKeys(beam.DoFn):
+      def __init__(self, n):
+        self._n = n
+
+      def process(self, unused_element, dict_side_input):
+        import random
+        n = min(self._n, len(dict_side_input))
+        return random.sample(dict_side_input.keys(), n)
+
+    class AddEventTimestamps(beam.DoFn):
+      """Assign timestamp to each element of PCollection."""
+      def setup(self):
+        self._timestamp = 0
+
+      def process(self, element):
+        from apache_beam.transforms.combiners import window
+        yield window.TimestampedValue(element, self._timestamp)
+        self._timestamp += 1
+
+    input_pc = (
         self.pipeline
-        | "Read pcoll 1" >> beam.io.Read(
+        | 'Read synthetic' >> beam.io.Read(
             SyntheticSource(self.parse_synthetic_source_options()))
-        | 'Measure time: Start pcoll 1' >> beam.ParDo(
+        | 'Collect start time metrics' >> beam.ParDo(
             MeasureTime(self.metrics_namespace)))
 
-    side_input = (
-        self.pipeline
-        | "Read pcoll 2" >> beam.io.Read(
-            SyntheticSource(self.parse_synthetic_source_options()))
-        | 'Measure time: Start pcoll 2' >> beam.ParDo(
-            MeasureTime(self.metrics_namespace)))
-    # pylint: disable=expression-not-assigned
-    (
-        main_input
-        | "Merge" >> beam.ParDo(join_fn, AsIter(side_input), self.iterations)
-        | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace)))
+    if self.side_input_size != self.input_options.get('num_records'):
+      side_input = (
+          input_pc
+          | 'Sample {} elements'.format(self.side_input_size) >>
+          beam.combiners.Sample.FixedSizeGlobally(self.side_input_size)
+          | 'Flatten a sequence' >> beam.FlatMap(lambda x: x))
+    else:
+      side_input = input_pc
+
+    if self.windows > 0:
+      window_size = self.side_input_size / self.windows
+      logging.info('Fixed windows of %s seconds will be applied', window_size)
+      side_input = (
+          side_input
+          | 'Add event timestamps' >> beam.ParDo(AddEventTimestamps())
+          | 'Apply windows' >> beam.WindowInto(
+              beam.combiners.window.FixedWindows(window_size)))
+
+    side_input_type = self.materialize_as()
+    elements_to_access = self.side_input_size * self.access_percentage // 100
+    logging.info(
+        '%s out of %s total elements in the side input will be '
+        'accessed.',
+        elements_to_access,
+        self.side_input_size)
+    if side_input_type is beam.pvalue.AsDict:
+      random_keys = (
+          self.pipeline
+          | beam.Impulse()
+          | 'Get random keys' >> beam.ParDo(
+              GetRandomKeys(elements_to_access), beam.pvalue.AsDict(side_input))
+      )
+      pc = input_pc | beam.ParDo(
+          MappingSideInputTestDoFn(),
+          side_input_type(side_input),
+          beam.pvalue.AsList(random_keys))
+    else:
+      pc = input_pc | beam.ParDo(
+          SequenceSideInputTestDoFn(elements_to_access),
+          side_input_type(side_input))
+
+    _ = pc | 'Collect end time metrics' >> beam.ParDo(
+        MeasureTime(self.metrics_namespace))
 
 
 if __name__ == '__main__':