You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ka...@apache.org on 2020/10/03 13:42:36 UTC

[beam] branch master updated: [BEAM-7505] SideInput Python Load tests job (#11856)

This is an automated email from the ASF dual-hosted git repository.

kamilwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f65a0f  [BEAM-7505] SideInput Python Load tests job (#11856)
7f65a0f is described below

commit 7f65a0f81385810008b8576cc906edb880beebe6
Author: Kamil Wasilewski <ka...@polidea.com>
AuthorDate: Sat Oct 3 15:41:55 2020 +0200

    [BEAM-7505] SideInput Python Load tests job (#11856)
---
 .../jenkins/job_LoadTests_SideInput_Python.groovy  | 188 +++++++++++++++++++++
 .../testing/load_tests/sideinput_test.py           | 169 +++++++++---------
 2 files changed, 278 insertions(+), 79 deletions(-)

diff --git a/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy b/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy
new file mode 100644
index 0000000..41fe66d
--- /dev/null
+++ b/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import LoadTestsBuilder as loadTestsBuilder
+import PhraseTriggeringPostCommitBuilder
+import InfluxDBCredentialsHelper
+
+def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
+
+def fromTemplate = { mode, name, id, datasetName, testSpecificOptions ->
+  [
+    title          : "SideInput Python Load test: ${name}",
+    test           : 'apache_beam.testing.load_tests.sideinput_test',
+    runner         : CommonTestProperties.Runner.DATAFLOW,
+    pipelineOptions: [
+      job_name             : "load-tests-python-dataflow-${mode}-sideinput-${id}-${now}",
+      project              : 'apache-beam-testing',
+      region               : 'us-central1',
+      temp_location        : 'gs://temp-storage-for-perf-tests/loadtests',
+      publish_to_big_query : true,
+      metrics_dataset      : datasetName,
+      metrics_table        : "python_dataflow_${mode}_sideinput_${id}",
+      influx_measurement   : "python_${mode}_sideinput_${id}",
+      num_workers          : 10,
+      autoscaling_algorithm: 'NONE',
+      experiments          : 'use_runner_v2',
+    ] << testSpecificOptions
+  ]
+}
+
+def loadTestConfigurations = { mode, datasetName ->
+  [
+    [
+      name: '1gb-1kb-10workers-1window-1key-percent-dict',
+      testSpecificOptions: [
+        input_options    : '\'{' +
+        '"num_records": 1000000,' +
+        '"key_size": 100,' +
+        '"value_size": 900}\'',
+        side_input_type  : 'dict',
+        access_percentage: 1,
+      ]
+    ],
+    [
+      name: '1gb-1kb-10workers-1window-99key-percent-dict',
+      testSpecificOptions: [
+        input_options    : '\'{' +
+        '"num_records": 1000000,' +
+        '"key_size": 100,' +
+        '"value_size": 900}\'',
+        side_input_type  : 'dict',
+        access_percentage: 99,
+      ]
+    ],
+    [
+      name: '10gb-1kb-10workers-1window-first-iterable',
+      testSpecificOptions: [
+        input_options    : '\'{' +
+        '"num_records": 10000000,' +
+        '"key_size": 100,' +
+        '"value_size": 900}\'',
+        side_input_type  : 'iter',
+        access_percentage: 1,
+      ]
+    ],
+    [
+      name: '10gb-1kb-10workers-1window-iterable',
+      testSpecificOptions: [
+        input_options    : '\'{' +
+        '"num_records": 10000000,' +
+        '"key_size": 100,' +
+        '"value_size": 900}\'',
+        side_input_type  : 'iter',
+      ]
+    ],
+    [
+      name: '1gb-1kb-10workers-1window-first-list',
+      testSpecificOptions: [
+        input_options    : '\'{' +
+        '"num_records": 1000000,' +
+        '"key_size": 100,' +
+        '"value_size": 900}\'',
+        side_input_type  : 'list',
+        access_percentage: 1,
+      ]
+    ],
+    [
+      name: '1gb-1kb-10workers-1window-list',
+      testSpecificOptions: [
+        input_options    : '\'{' +
+        '"num_records": 1000000,' +
+        '"key_size": 100,' +
+        '"value_size": 900}\'',
+        side_input_type  : 'list',
+      ]
+    ],
+    [
+      name: '1gb-1kb-10workers-1000window-1key-percent-dict',
+      testSpecificOptions: [
+        input_options    : '\'{' +
+        '"num_records": 1000000,' +
+        '"key_size": 100,' +
+        '"value_size": 900}\'',
+        side_input_type  : 'dict',
+        access_percentage: 1,
+        window_count     : 1000,
+      ]
+    ],
+    [
+      name: '1gb-1kb-10workers-1000window-99key-percent-dict',
+      testSpecificOptions: [
+        input_options    : '\'{' +
+        '"num_records": 1000000,' +
+        '"key_size": 100,' +
+        '"value_size": 900}\'',
+        side_input_type  : 'dict',
+        access_percentage: 99,
+        window_count     : 1000,
+      ]
+    ],
+    [
+      name: '10gb-1kb-10workers-1000window-first-iterable',
+      testSpecificOptions: [
+        input_options    : '\'{' +
+        '"num_records": 10000000,' +
+        '"key_size": 100,' +
+        '"value_size": 900}\'',
+        side_input_type  : 'iter',
+        access_percentage: 1,
+        window_count     : 1000,
+      ]
+    ],
+    [
+      name: '10gb-1kb-10workers-1000window-iterable',
+      testSpecificOptions: [
+        input_options    : '\'{' +
+        '"num_records": 10000000,' +
+        '"key_size": 100,' +
+        '"value_size": 900}\'',
+        side_input_type  : 'iter',
+        window_count     : 1000,
+      ]
+    ],
+  ].indexed().collect { index, it ->
+    fromTemplate(mode, it.name, index + 1, datasetName, it.testSpecificOptions << additionalPipelineArgs)
+  }
+}
+
+
+def loadTestJob = { scope, triggeringContext, mode ->
+  def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', triggeringContext)
+  loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON_37,
+      loadTestConfigurations(mode, datasetName), 'SideInput', mode)
+}
+
+PhraseTriggeringPostCommitBuilder.postCommitJob(
+    'beam_LoadTests_Python_SideInput_Dataflow_Batch',
+    'Run Python Load Tests SideInput Dataflow Batch',
+    'Load Tests Python SideInput Dataflow Batch suite',
+    this
+    ) {
+      additionalPipelineArgs = [:]
+      loadTestJob(delegate, CommonTestProperties.TriggeringContext.PR, 'batch')
+    }
+
+CronJobBuilder.cronJob('beam_LoadTests_Python_SideInput_Dataflow_Batch', 'H 11 * * *', this) {
+  additionalPipelineArgs = [
+    influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+    influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostUrl,
+  ]
+  loadTestJob(delegate, CommonTestProperties.TriggeringContext.POST_COMMIT, 'batch')
+}
diff --git a/sdks/python/apache_beam/testing/load_tests/sideinput_test.py b/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
index 987c1ad..28dd16c 100644
--- a/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
@@ -27,11 +27,7 @@ This test can accept the following parameters:
   * side_input_type (str) - Required. Specifies how the side input will be
     materialized in ParDo operation. Choose from (dict, iter, list).
   * window_count (int) - The number of fixed sized windows to subdivide the
-    side input into. By default, no windows will be used.
-  * side_input_size (int) - The size of the side input. Must be equal to or
-    lower than the size of the main input. If lower, the side input will be
-    created by applying a :class:`beam.combiners.Sample
-    <apache_beam.transforms.combiners.Sample>` transform.
+    side input into. By default, a global window will be used.
   * access_percentage (int) - Specifies the percentage of elements in the side
     input to be accessed. By default, all elements will be accessed.
 
@@ -64,11 +60,17 @@ from __future__ import absolute_import
 from __future__ import division
 
 import logging
+from typing import Any
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+from typing import Union
 
 import apache_beam as beam
 from apache_beam.testing.load_tests.load_test import LoadTest
 from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
-from apache_beam.testing.synthetic_pipeline import SyntheticSource
+from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource
+from apache_beam.transforms import window
 
 
 class SideInputTest(LoadTest):
@@ -77,10 +79,12 @@ class SideInputTest(LoadTest):
       'list': beam.pvalue.AsList,
       'dict': beam.pvalue.AsDict,
   }
+  SDF_INITIAL_ELEMENTS = 1000
 
   def __init__(self):
     super(SideInputTest, self).__init__()
-    self.windows = self.get_option_or_default('window_count', default=0)
+    self.windows = self.get_option_or_default('window_count', default=1)
+
     self.access_percentage = self.get_option_or_default(
         'access_percentage', default=100)
     if self.access_percentage < 0 or self.access_percentage > 100:
@@ -88,10 +92,7 @@ class SideInputTest(LoadTest):
           'access_percentage: Invalid value. Should be in range '
           'from 0 to 100, got {} instead'.format(self.access_percentage))
 
-    self.side_input_size = self.get_option_or_default(
-        'side_input_size', default=0)
-    if self.side_input_size == 0:
-      self.side_input_size = self.input_options.get('num_records')
+    self.elements_per_window = self.input_options['num_records'] // self.windows
 
     self.side_input_type = self.pipeline.get_option('side_input_type')
     if self.side_input_type is None:
@@ -110,10 +111,12 @@ class SideInputTest(LoadTest):
   def test(self):
     class SequenceSideInputTestDoFn(beam.DoFn):
       """Iterate over first n side_input elements."""
-      def __init__(self, first_n):
+      def __init__(self, first_n: int):
         self._first_n = first_n
 
-      def process(self, unused_element, side_input):
+      def process(  # type: ignore[override]
+          self, element: Any, side_input: Iterable[Tuple[bytes,
+                                                         bytes]]) -> None:
         i = 0
         it = iter(side_input)
         while i < self._first_n:
@@ -122,85 +125,93 @@ class SideInputTest(LoadTest):
             # No-op. We only make sure that the element is accessed.
             next(it)
           except StopIteration:
-            return
+            break
 
     class MappingSideInputTestDoFn(beam.DoFn):
-      """Take a sequence of keys as an additional side input and for each
-      key in the sequence checks the value for key in the dictionary."""
-      def process(self, unused_element, dict_side_input, keys_to_check):
-        for key in keys_to_check:
+      """Iterates over first n keys in the dictionary and checks the value."""
+      def __init__(self, first_n: int):
+        self._first_n = first_n
+
+      def process(  # type: ignore[override]
+          self, element: Any, dict_side_input: Dict[bytes, bytes]) -> None:
+        i = 0
+        for key in dict_side_input:
+          if i == self._first_n:
+            break
           # No-op. We only make sure that the element is accessed.
           dict_side_input[key]
+          i += 1
 
-    class GetRandomKeys(beam.DoFn):
-      def __init__(self, n):
-        self._n = n
-
-      def process(self, unused_element, dict_side_input):
-        import random
-        n = min(self._n, len(dict_side_input))
-        return random.sample(dict_side_input.keys(), n)
-
-    class AddEventTimestamps(beam.DoFn):
-      """Assign timestamp to each element of PCollection."""
-      def setup(self):
-        self._timestamp = 0
-
-      def process(self, element):
-        from apache_beam.transforms.combiners import window
-        yield window.TimestampedValue(element, self._timestamp)
-        self._timestamp += 1
-
-    input_pc = (
-        self.pipeline
-        | 'Read synthetic' >> beam.io.Read(
-            SyntheticSource(self.parse_synthetic_source_options()))
-        | 'Collect start time metrics' >> beam.ParDo(
-            MeasureTime(self.metrics_namespace)))
-
-    if self.side_input_size != self.input_options.get('num_records'):
-      side_input = (
-          input_pc
-          | 'Sample {} elements'.format(self.side_input_size) >>
-          beam.combiners.Sample.FixedSizeGlobally(self.side_input_size)
-          | 'Flatten a sequence' >> beam.FlatMap(lambda x: x))
+    class AssignTimestamps(beam.DoFn):
+      """Produces timestamped values. Timestamps are equal to the value of the
+      element."""
+      def __init__(self):
+        # Avoid having to use save_main_session
+        self.window = window
+
+      def process(self, element: int) -> Iterable[window.TimestampedValue]:  # type: ignore[override]
+        yield self.window.TimestampedValue(element, element)
+
+    class GetSyntheticSDFOptions(beam.DoFn):
+      def __init__(
+          self, elements_per_record: int, key_size: int, value_size: int):
+        self.elements_per_record = elements_per_record
+        self.key_size = key_size
+        self.value_size = value_size
+
+      def process(self, element: Any) -> Iterable[Dict[str, Union[int, str]]]:  # type: ignore[override]
+        yield {
+            'num_records': self.elements_per_record,
+            'key_size': self.key_size,
+            'value_size': self.value_size,
+            'initial_splitting_num_bundles': 0,
+            'initial_splitting_desired_bundle_size': 0,
+            'sleep_per_input_record_sec': 0,
+            'initial_splitting': 'const'
+        }
+
+    main_input = self.pipeline | 'Create' >> beam.Create(range(self.windows))
+
+    initial_elements = self.SDF_INITIAL_ELEMENTS
+    if self.windows > 1:
+      main_input = (
+          main_input
+          | 'Assign timestamps' >> beam.ParDo(AssignTimestamps())
+          | 'Apply windows' >> beam.WindowInto(window.FixedWindows(1)))
+      side_input = main_input
+      initial_elements = self.windows
     else:
-      side_input = input_pc
-
-    if self.windows > 0:
-      window_size = self.side_input_size / self.windows
-      logging.info('Fixed windows of %s seconds will be applied', window_size)
-      side_input = (
-          side_input
-          | 'Add event timestamps' >> beam.ParDo(AddEventTimestamps())
-          | 'Apply windows' >> beam.WindowInto(
-              beam.combiners.window.FixedWindows(window_size)))
+      side_input = self.pipeline | 'Side input: create' >> beam.Create(
+          range(initial_elements))
+
+    side_input = (
+        side_input
+        | 'Get synthetic SDF options' >> beam.ParDo(
+            GetSyntheticSDFOptions(
+                self.input_options['num_records'] // initial_elements,
+                self.input_options['key_size'],
+                self.input_options['value_size']))
+        | 'Generate input' >> beam.ParDo(SyntheticSDFAsSource()))
+    main_input |= 'Collect start time metrics' >> beam.ParDo(
+        MeasureTime(self.metrics_namespace))
 
     side_input_type = self.materialize_as()
-    elements_to_access = self.side_input_size * self.access_percentage // 100
+    elements_to_access = self.elements_per_window * \
+                         self.access_percentage // 100
     logging.info(
-        '%s out of %s total elements in the side input will be '
-        'accessed.',
+        '%s out of %s total elements in each window will be accessed.',
         elements_to_access,
-        self.side_input_size)
+        self.elements_per_window)
     if side_input_type is beam.pvalue.AsDict:
-      random_keys = (
-          self.pipeline
-          | beam.Impulse()
-          | 'Get random keys' >> beam.ParDo(
-              GetRandomKeys(elements_to_access), beam.pvalue.AsDict(side_input))
-      )
-      pc = input_pc | beam.ParDo(
-          MappingSideInputTestDoFn(),
-          side_input_type(side_input),
-          beam.pvalue.AsList(random_keys))
+      dofn = MappingSideInputTestDoFn(elements_to_access)
     else:
-      pc = input_pc | beam.ParDo(
-          SequenceSideInputTestDoFn(elements_to_access),
-          side_input_type(side_input))
+      dofn = SequenceSideInputTestDoFn(elements_to_access)
 
-    _ = pc | 'Collect end time metrics' >> beam.ParDo(
-        MeasureTime(self.metrics_namespace))
+    _ = (
+        main_input
+        | beam.ParDo(dofn, side_input_type(side_input))
+        | 'Collect end time metrics' >> beam.ParDo(
+            MeasureTime(self.metrics_namespace)))
 
 
 if __name__ == '__main__':