You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ka...@apache.org on 2020/10/03 13:42:36 UTC
[beam] branch master updated: [BEAM-7505] SideInput Python Load
tests job (#11856)
This is an automated email from the ASF dual-hosted git repository.
kamilwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7f65a0f [BEAM-7505] SideInput Python Load tests job (#11856)
7f65a0f is described below
commit 7f65a0f81385810008b8576cc906edb880beebe6
Author: Kamil Wasilewski <ka...@polidea.com>
AuthorDate: Sat Oct 3 15:41:55 2020 +0200
[BEAM-7505] SideInput Python Load tests job (#11856)
---
.../jenkins/job_LoadTests_SideInput_Python.groovy | 188 +++++++++++++++++++++
.../testing/load_tests/sideinput_test.py | 169 +++++++++---------
2 files changed, 278 insertions(+), 79 deletions(-)
diff --git a/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy b/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy
new file mode 100644
index 0000000..41fe66d
--- /dev/null
+++ b/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import LoadTestsBuilder as loadTestsBuilder
+import PhraseTriggeringPostCommitBuilder
+import InfluxDBCredentialsHelper
+
+def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
+
+def fromTemplate = { mode, name, id, datasetName, testSpecificOptions ->
+ [
+ title : "SideInput Python Load test: ${name}",
+ test : 'apache_beam.testing.load_tests.sideinput_test',
+ runner : CommonTestProperties.Runner.DATAFLOW,
+ pipelineOptions: [
+ job_name : "load-tests-python-dataflow-${mode}-sideinput-${id}-${now}",
+ project : 'apache-beam-testing',
+ region : 'us-central1',
+ temp_location : 'gs://temp-storage-for-perf-tests/loadtests',
+ publish_to_big_query : true,
+ metrics_dataset : datasetName,
+ metrics_table : "python_dataflow_${mode}_sideinput_${id}",
+ influx_measurement : "python_${mode}_sideinput_${id}",
+ num_workers : 10,
+ autoscaling_algorithm: 'NONE',
+ experiments : 'use_runner_v2',
+ ] << testSpecificOptions
+ ]
+}
+
+def loadTestConfigurations = { mode, datasetName ->
+ [
+ [
+ name: '1gb-1kb-10workers-1window-1key-percent-dict',
+ testSpecificOptions: [
+ input_options : '\'{' +
+ '"num_records": 1000000,' +
+ '"key_size": 100,' +
+ '"value_size": 900}\'',
+ side_input_type : 'dict',
+ access_percentage: 1,
+ ]
+ ],
+ [
+ name: '1gb-1kb-10workers-1window-99key-percent-dict',
+ testSpecificOptions: [
+ input_options : '\'{' +
+ '"num_records": 1000000,' +
+ '"key_size": 100,' +
+ '"value_size": 900}\'',
+ side_input_type : 'dict',
+ access_percentage: 99,
+ ]
+ ],
+ [
+ name: '10gb-1kb-10workers-1window-first-iterable',
+ testSpecificOptions: [
+ input_options : '\'{' +
+ '"num_records": 10000000,' +
+ '"key_size": 100,' +
+ '"value_size": 900}\'',
+ side_input_type : 'iter',
+ access_percentage: 1,
+ ]
+ ],
+ [
+ name: '10gb-1kb-10workers-1window-iterable',
+ testSpecificOptions: [
+ input_options : '\'{' +
+ '"num_records": 10000000,' +
+ '"key_size": 100,' +
+ '"value_size": 900}\'',
+ side_input_type : 'iter',
+ ]
+ ],
+ [
+ name: '1gb-1kb-10workers-1window-first-list',
+ testSpecificOptions: [
+ input_options : '\'{' +
+ '"num_records": 1000000,' +
+ '"key_size": 100,' +
+ '"value_size": 900}\'',
+ side_input_type : 'list',
+ access_percentage: 1,
+ ]
+ ],
+ [
+ name: '1gb-1kb-10workers-1window-list',
+ testSpecificOptions: [
+ input_options : '\'{' +
+ '"num_records": 1000000,' +
+ '"key_size": 100,' +
+ '"value_size": 900}\'',
+ side_input_type : 'list',
+ ]
+ ],
+ [
+ name: '1gb-1kb-10workers-1000window-1key-percent-dict',
+ testSpecificOptions: [
+ input_options : '\'{' +
+ '"num_records": 1000000,' +
+ '"key_size": 100,' +
+ '"value_size": 900}\'',
+ side_input_type : 'dict',
+ access_percentage: 1,
+ window_count : 1000,
+ ]
+ ],
+ [
+ name: '1gb-1kb-10workers-1000window-99key-percent-dict',
+ testSpecificOptions: [
+ input_options : '\'{' +
+ '"num_records": 1000000,' +
+ '"key_size": 100,' +
+ '"value_size": 900}\'',
+ side_input_type : 'dict',
+ access_percentage: 99,
+ window_count : 1000,
+ ]
+ ],
+ [
+ name: '10gb-1kb-10workers-1000window-first-iterable',
+ testSpecificOptions: [
+ input_options : '\'{' +
+ '"num_records": 10000000,' +
+ '"key_size": 100,' +
+ '"value_size": 900}\'',
+ side_input_type : 'iter',
+ access_percentage: 1,
+ window_count : 1000,
+ ]
+ ],
+ [
+ name: '10gb-1kb-10workers-1000window-iterable',
+ testSpecificOptions: [
+ input_options : '\'{' +
+ '"num_records": 10000000,' +
+ '"key_size": 100,' +
+ '"value_size": 900}\'',
+ side_input_type : 'iter',
+ window_count : 1000,
+ ]
+ ],
+ ].indexed().collect { index, it ->
+ fromTemplate(mode, it.name, index + 1, datasetName, it.testSpecificOptions << additionalPipelineArgs)
+ }
+}
+
+
+def loadTestJob = { scope, triggeringContext, mode ->
+ def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', triggeringContext)
+ loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON_37,
+ loadTestConfigurations(mode, datasetName), 'SideInput', mode)
+}
+
+PhraseTriggeringPostCommitBuilder.postCommitJob(
+ 'beam_LoadTests_Python_SideInput_Dataflow_Batch',
+ 'Run Python Load Tests SideInput Dataflow Batch',
+ 'Load Tests Python SideInput Dataflow Batch suite',
+ this
+ ) {
+ additionalPipelineArgs = [:]
+ loadTestJob(delegate, CommonTestProperties.TriggeringContext.PR, 'batch')
+ }
+
+CronJobBuilder.cronJob('beam_LoadTests_Python_SideInput_Dataflow_Batch', 'H 11 * * *', this) {
+ additionalPipelineArgs = [
+ influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+ influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostUrl,
+ ]
+ loadTestJob(delegate, CommonTestProperties.TriggeringContext.POST_COMMIT, 'batch')
+}
diff --git a/sdks/python/apache_beam/testing/load_tests/sideinput_test.py b/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
index 987c1ad..28dd16c 100644
--- a/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/sideinput_test.py
@@ -27,11 +27,7 @@ This test can accept the following parameters:
* side_input_type (str) - Required. Specifies how the side input will be
materialized in ParDo operation. Choose from (dict, iter, list).
* window_count (int) - The number of fixed sized windows to subdivide the
- side input into. By default, no windows will be used.
- * side_input_size (int) - The size of the side input. Must be equal to or
- lower than the size of the main input. If lower, the side input will be
- created by applying a :class:`beam.combiners.Sample
- <apache_beam.transforms.combiners.Sample>` transform.
+ side input into. By default, a global window will be used.
* access_percentage (int) - Specifies the percentage of elements in the side
input to be accessed. By default, all elements will be accessed.
@@ -64,11 +60,17 @@ from __future__ import absolute_import
from __future__ import division
import logging
+from typing import Any
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+from typing import Union
import apache_beam as beam
from apache_beam.testing.load_tests.load_test import LoadTest
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
-from apache_beam.testing.synthetic_pipeline import SyntheticSource
+from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource
+from apache_beam.transforms import window
class SideInputTest(LoadTest):
@@ -77,10 +79,12 @@ class SideInputTest(LoadTest):
'list': beam.pvalue.AsList,
'dict': beam.pvalue.AsDict,
}
+ SDF_INITIAL_ELEMENTS = 1000
def __init__(self):
super(SideInputTest, self).__init__()
- self.windows = self.get_option_or_default('window_count', default=0)
+ self.windows = self.get_option_or_default('window_count', default=1)
+
self.access_percentage = self.get_option_or_default(
'access_percentage', default=100)
if self.access_percentage < 0 or self.access_percentage > 100:
@@ -88,10 +92,7 @@ class SideInputTest(LoadTest):
'access_percentage: Invalid value. Should be in range '
'from 0 to 100, got {} instead'.format(self.access_percentage))
- self.side_input_size = self.get_option_or_default(
- 'side_input_size', default=0)
- if self.side_input_size == 0:
- self.side_input_size = self.input_options.get('num_records')
+ self.elements_per_window = self.input_options['num_records'] // self.windows
self.side_input_type = self.pipeline.get_option('side_input_type')
if self.side_input_type is None:
@@ -110,10 +111,12 @@ class SideInputTest(LoadTest):
def test(self):
class SequenceSideInputTestDoFn(beam.DoFn):
"""Iterate over first n side_input elements."""
- def __init__(self, first_n):
+ def __init__(self, first_n: int):
self._first_n = first_n
- def process(self, unused_element, side_input):
+ def process( # type: ignore[override]
+ self, element: Any, side_input: Iterable[Tuple[bytes,
+ bytes]]) -> None:
i = 0
it = iter(side_input)
while i < self._first_n:
@@ -122,85 +125,93 @@ class SideInputTest(LoadTest):
# No-op. We only make sure that the element is accessed.
next(it)
except StopIteration:
- return
+ break
class MappingSideInputTestDoFn(beam.DoFn):
- """Take a sequence of keys as an additional side input and for each
- key in the sequence checks the value for key in the dictionary."""
- def process(self, unused_element, dict_side_input, keys_to_check):
- for key in keys_to_check:
+ """Iterates over first n keys in the dictionary and checks the value."""
+ def __init__(self, first_n: int):
+ self._first_n = first_n
+
+ def process( # type: ignore[override]
+ self, element: Any, dict_side_input: Dict[bytes, bytes]) -> None:
+ i = 0
+ for key in dict_side_input:
+ if i == self._first_n:
+ break
# No-op. We only make sure that the element is accessed.
dict_side_input[key]
+ i += 1
- class GetRandomKeys(beam.DoFn):
- def __init__(self, n):
- self._n = n
-
- def process(self, unused_element, dict_side_input):
- import random
- n = min(self._n, len(dict_side_input))
- return random.sample(dict_side_input.keys(), n)
-
- class AddEventTimestamps(beam.DoFn):
- """Assign timestamp to each element of PCollection."""
- def setup(self):
- self._timestamp = 0
-
- def process(self, element):
- from apache_beam.transforms.combiners import window
- yield window.TimestampedValue(element, self._timestamp)
- self._timestamp += 1
-
- input_pc = (
- self.pipeline
- | 'Read synthetic' >> beam.io.Read(
- SyntheticSource(self.parse_synthetic_source_options()))
- | 'Collect start time metrics' >> beam.ParDo(
- MeasureTime(self.metrics_namespace)))
-
- if self.side_input_size != self.input_options.get('num_records'):
- side_input = (
- input_pc
- | 'Sample {} elements'.format(self.side_input_size) >>
- beam.combiners.Sample.FixedSizeGlobally(self.side_input_size)
- | 'Flatten a sequence' >> beam.FlatMap(lambda x: x))
+ class AssignTimestamps(beam.DoFn):
+ """Produces timestamped values. Timestamps are equal to the value of the
+ element."""
+ def __init__(self):
+ # Avoid having to use save_main_session
+ self.window = window
+
+ def process(self, element: int) -> Iterable[window.TimestampedValue]: # type: ignore[override]
+ yield self.window.TimestampedValue(element, element)
+
+ class GetSyntheticSDFOptions(beam.DoFn):
+ def __init__(
+ self, elements_per_record: int, key_size: int, value_size: int):
+ self.elements_per_record = elements_per_record
+ self.key_size = key_size
+ self.value_size = value_size
+
+ def process(self, element: Any) -> Iterable[Dict[str, Union[int, str]]]: # type: ignore[override]
+ yield {
+ 'num_records': self.elements_per_record,
+ 'key_size': self.key_size,
+ 'value_size': self.value_size,
+ 'initial_splitting_num_bundles': 0,
+ 'initial_splitting_desired_bundle_size': 0,
+ 'sleep_per_input_record_sec': 0,
+ 'initial_splitting': 'const'
+ }
+
+ main_input = self.pipeline | 'Create' >> beam.Create(range(self.windows))
+
+ initial_elements = self.SDF_INITIAL_ELEMENTS
+ if self.windows > 1:
+ main_input = (
+ main_input
+ | 'Assign timestamps' >> beam.ParDo(AssignTimestamps())
+ | 'Apply windows' >> beam.WindowInto(window.FixedWindows(1)))
+ side_input = main_input
+ initial_elements = self.windows
else:
- side_input = input_pc
-
- if self.windows > 0:
- window_size = self.side_input_size / self.windows
- logging.info('Fixed windows of %s seconds will be applied', window_size)
- side_input = (
- side_input
- | 'Add event timestamps' >> beam.ParDo(AddEventTimestamps())
- | 'Apply windows' >> beam.WindowInto(
- beam.combiners.window.FixedWindows(window_size)))
+ side_input = self.pipeline | 'Side input: create' >> beam.Create(
+ range(initial_elements))
+
+ side_input = (
+ side_input
+ | 'Get synthetic SDF options' >> beam.ParDo(
+ GetSyntheticSDFOptions(
+ self.input_options['num_records'] // initial_elements,
+ self.input_options['key_size'],
+ self.input_options['value_size']))
+ | 'Generate input' >> beam.ParDo(SyntheticSDFAsSource()))
+ main_input |= 'Collect start time metrics' >> beam.ParDo(
+ MeasureTime(self.metrics_namespace))
side_input_type = self.materialize_as()
- elements_to_access = self.side_input_size * self.access_percentage // 100
+ elements_to_access = self.elements_per_window * \
+ self.access_percentage // 100
logging.info(
- '%s out of %s total elements in the side input will be '
- 'accessed.',
+ '%s out of %s total elements in each window will be accessed.',
elements_to_access,
- self.side_input_size)
+ self.elements_per_window)
if side_input_type is beam.pvalue.AsDict:
- random_keys = (
- self.pipeline
- | beam.Impulse()
- | 'Get random keys' >> beam.ParDo(
- GetRandomKeys(elements_to_access), beam.pvalue.AsDict(side_input))
- )
- pc = input_pc | beam.ParDo(
- MappingSideInputTestDoFn(),
- side_input_type(side_input),
- beam.pvalue.AsList(random_keys))
+ dofn = MappingSideInputTestDoFn(elements_to_access)
else:
- pc = input_pc | beam.ParDo(
- SequenceSideInputTestDoFn(elements_to_access),
- side_input_type(side_input))
+ dofn = SequenceSideInputTestDoFn(elements_to_access)
- _ = pc | 'Collect end time metrics' >> beam.ParDo(
- MeasureTime(self.metrics_namespace))
+ _ = (
+ main_input
+ | beam.ParDo(dofn, side_input_type(side_input))
+ | 'Collect end time metrics' >> beam.ParDo(
+ MeasureTime(self.metrics_namespace)))
if __name__ == '__main__':