You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/12/09 03:36:38 UTC
[beam] branch master updated: Add GroupIntoBatches to runner API;
add Dataflow override in Python SDK
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 05c8471 Add GroupIntoBatches to runner API; add Dataflow override in Python SDK
new 2c96aeb Merge pull request #13405 from [BEAM-10703, BEAM-10475] Add GroupIntoBatches to runner API; add Dataflow override in Python SDK
05c8471 is described below
commit 05c8471b27e03e5611a2a13137c4a785f2d17fc9
Author: sychen <sy...@google.com>
AuthorDate: Mon Nov 9 21:16:50 2020 -0800
Add GroupIntoBatches to runner API; add Dataflow override in Python SDK
---
.../pipeline/src/main/proto/beam_runner_api.proto | 18 ++++
sdks/python/apache_beam/portability/common_urns.py | 1 +
.../runners/dataflow/dataflow_runner.py | 32 +++++-
.../runners/dataflow/dataflow_runner_test.py | 62 ++++++++++++
.../apache_beam/runners/dataflow/internal/names.py | 5 +
.../runners/dataflow/ptransform_overrides.py | 45 +++++++++
sdks/python/apache_beam/transforms/util.py | 111 ++++++++++++++++-----
sdks/python/apache_beam/transforms/util_test.py | 41 ++++++++
8 files changed, 288 insertions(+), 27 deletions(-)
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index cbfd817..ce561d3 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -344,6 +344,10 @@ message StandardPTransforms {
// Payload: PubSubWritePayload.
PUBSUB_WRITE = 5 [(beam_urn) = "beam:transform:pubsub_write:v1"];
+
+ // Represents the GroupIntoBatches.WithShardedKey operation.
+ // Payload: GroupIntoBatchesPayload
+ GROUP_INTO_BATCHES_WITH_SHARDED_KEY = 6 [(beam_urn) = "beam:transform:group_into_batches_with_sharded_key:v1"];
}
// Payload for all of these: CombinePayload
enum CombineComponents {
@@ -414,6 +418,10 @@ message StandardPTransforms {
// Output: KV(KV(element, restriction), size).
TRUNCATE_SIZED_RESTRICTION = 3 [(beam_urn) = "beam:transform:sdf_truncate_sized_restrictions:v1"];
}
+ // Payload for all of these: GroupIntoBatchesPayload
+ enum GroupIntoBatchesComponents {
+ GROUP_INTO_BATCHES = 0 [(beam_urn) = "beam:transform:group_into_batches:v1"];
+ }
}
message StandardSideInputTypes {
@@ -706,6 +714,16 @@ message PubSubWritePayload {
string id_attribute = 3;
}
+// Payload for GroupIntoBatches composite transform.
+message GroupIntoBatchesPayload {
+
+ // (Required) Max size of a batch.
+ int64 batch_size = 1;
+
+ // (Optional) Max duration a batch is allowed to be cached in states.
+ int64 max_buffering_duration_millis = 2;
+}
+
// A coder, the binary format for serialization and deserialization of data in
// a pipeline.
message Coder {
diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py
index 819264d..e356319 100644
--- a/sdks/python/apache_beam/portability/common_urns.py
+++ b/sdks/python/apache_beam/portability/common_urns.py
@@ -42,6 +42,7 @@ deprecated_primitives = StandardPTransforms.DeprecatedPrimitives
composites = StandardPTransforms.Composites
combine_components = StandardPTransforms.CombineComponents
sdf_components = StandardPTransforms.SplittableParDoComponents
+group_into_batches_components = StandardPTransforms.GroupIntoBatchesComponents
side_inputs = StandardSideInputTypes.Enum
coders = StandardCoders.Enum
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 06853f4..92d9c19 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -505,7 +505,11 @@ class DataflowRunner(PipelineRunner):
pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)
from apache_beam.runners.dataflow.ptransform_overrides import WriteToBigQueryPTransformOverride
- pipeline.replace_all([WriteToBigQueryPTransformOverride(pipeline, options)])
+ from apache_beam.runners.dataflow.ptransform_overrides import GroupIntoBatchesWithShardedKeyPTransformOverride
+ pipeline.replace_all([
+ WriteToBigQueryPTransformOverride(pipeline, options),
+ GroupIntoBatchesWithShardedKeyPTransformOverride(self, options)
+ ])
if use_fnapi and not apiclient._use_unified_worker(options):
pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
@@ -727,6 +731,18 @@ class DataflowRunner(PipelineRunner):
window_coder = None
return self._get_typehint_based_encoding(element_type, window_coder)
+ def get_pcoll_with_auto_sharding(self):
+ if not hasattr(self, '_pcoll_with_auto_sharding'):
+ return set()
+ return self._pcoll_with_auto_sharding
+
+ def add_pcoll_with_auto_sharding(self, applied_ptransform):
+ if not hasattr(self, '_pcoll_with_auto_sharding'):
+ self.__setattr__('_pcoll_with_auto_sharding', set())
+ output = DataflowRunner._only_element(applied_ptransform.outputs.keys())
+ self._pcoll_with_auto_sharding.add(
+ applied_ptransform.outputs[output]._unique_name())
+
def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
"""Creates a Step object and adds it to the cache."""
# Import here to avoid adding the dependency for local running scenarios.
@@ -1112,6 +1128,20 @@ class DataflowRunner(PipelineRunner):
if is_stateful_dofn:
step.add_property(PropertyNames.USES_KEYED_STATE, 'true')
+ # Also checks whether the step allows shardable keyed states.
+ # TODO(BEAM-11360): remove this when migrated to portable job
+ # submission since we only consider supporting the property in runner
+ # v2.
+ for pcoll in transform_node.outputs.values():
+ if pcoll._unique_name() in self.get_pcoll_with_auto_sharding():
+ step.add_property(PropertyNames.ALLOWS_SHARDABLE_STATE, 'true')
+ # Currently we only allow auto-sharding to be enabled through the
+ # GroupIntoBatches transform. So we also add the following property
+ # which GroupIntoBatchesDoFn has, to allow the backend to perform
+ # graph optimization.
+ step.add_property(PropertyNames.PRESERVES_KEYS, 'true')
+ break
+
@staticmethod
def _pardo_fn_data(transform_node, get_label):
transform = transform_node.transform
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 8d64540..436d5d1 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -48,6 +48,7 @@ from apache_beam.runners import TestDataflowRunner
from apache_beam.runners import create_runner
from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException
+from apache_beam.runners.dataflow.dataflow_runner import PropertyNames
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
from apache_beam.runners.runner import PipelineState
from apache_beam.testing.extra_assertions import ExtraAssertionsMixin
@@ -787,6 +788,67 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
except ValueError:
self.fail('ValueError raised unexpectedly')
+ def _run_group_into_batches_and_get_step_properties(
+ self, with_sharded_key, additional_properties):
+ self.default_properties.append('--streaming')
+ self.default_properties.append(
+ '--experiment=enable_streaming_auto_sharding')
+ for property in additional_properties:
+ self.default_properties.append(property)
+
+ runner = DataflowRunner()
+ with beam.Pipeline(runner=runner,
+ options=PipelineOptions(self.default_properties)) as p:
+ # pylint: disable=expression-not-assigned
+ input = p | beam.Create([('a', 1), ('a', 1), ('b', 3), ('b', 4)])
+ if with_sharded_key:
+ (
+ input | beam.GroupIntoBatches.WithShardedKey(2)
+ | beam.Map(lambda key_values: (key_values[0].key, key_values[1])))
+ step_name = (
+ u'WithShardedKey/GroupIntoBatches/ParDo(_GroupIntoBatchesDoFn)')
+ else:
+ input | beam.GroupIntoBatches(2)
+ step_name = u'GroupIntoBatches/ParDo(_GroupIntoBatchesDoFn)'
+
+ return self._find_step(runner.job, step_name)['properties']
+
+ def test_group_into_batches_translation(self):
+ properties = self._run_group_into_batches_and_get_step_properties(
+ True, ['--enable_streaming_engine', '--experiment=use_runner_v2'])
+ self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
+ self.assertEqual(properties[PropertyNames.ALLOWS_SHARDABLE_STATE], u'true')
+ self.assertEqual(properties[PropertyNames.PRESERVES_KEYS], u'true')
+
+ def test_group_into_batches_translation_non_se(self):
+ properties = self._run_group_into_batches_and_get_step_properties(
+ True, ['--experiment=use_runner_v2'])
+ self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
+ self.assertFalse(PropertyNames.ALLOWS_SHARDABLE_STATE in properties)
+ self.assertFalse(PropertyNames.PRESERVES_KEYS in properties)
+
+ def test_group_into_batches_translation_non_sharded(self):
+ properties = self._run_group_into_batches_and_get_step_properties(
+ False, ['--enable_streaming_engine', '--experiment=use_runner_v2'])
+ self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
+ self.assertFalse(PropertyNames.ALLOWS_SHARDABLE_STATE in properties)
+ self.assertFalse(PropertyNames.PRESERVES_KEYS in properties)
+
+ def test_group_into_batches_translation_non_unified_worker(self):
+ # non-portable
+ properties = self._run_group_into_batches_and_get_step_properties(
+ True, ['--enable_streaming_engine'])
+ self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
+ self.assertFalse(PropertyNames.ALLOWS_SHARDABLE_STATE in properties)
+ self.assertFalse(PropertyNames.PRESERVES_KEYS in properties)
+
+ # JRH
+ properties = self._run_group_into_batches_and_get_step_properties(
+ True, ['--enable_streaming_engine', '--experiment=beam_fn_api'])
+ self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], u'true')
+ self.assertFalse(PropertyNames.ALLOWS_SHARDABLE_STATE in properties)
+ self.assertFalse(PropertyNames.PRESERVES_KEYS in properties)
+
class CustomMergingWindowFn(window.WindowFn):
def assign(self, assign_context):
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 79ef7c1..7a2ed09 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -69,6 +69,8 @@ class PropertyNames(object):
Property strings as they are expected in the CloudWorkflow protos.
"""
+ # If uses_keyed_state, whether the state can be sharded.
+ ALLOWS_SHARDABLE_STATE = 'allows_shardable_state'
BIGQUERY_CREATE_DISPOSITION = 'create_disposition'
BIGQUERY_DATASET = 'dataset'
BIGQUERY_EXPORT_FORMAT = 'bigquery_export_format'
@@ -98,6 +100,9 @@ class PropertyNames(object):
OUTPUT_NAME = 'output_name'
PARALLEL_INPUT = 'parallel_input'
PIPELINE_PROTO_TRANSFORM_ID = 'pipeline_proto_transform_id'
+ # If the input element is a key/value pair, then the output element(s) all
+ # have the same key as the input.
+ PRESERVES_KEYS = 'preserves_keys'
PUBSUB_ID_LABEL = 'pubsub_id_label'
PUBSUB_SERIALIZED_ATTRIBUTES_FN = 'pubsub_serialized_attributes_fn'
PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index f91c016..402a4ed 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -22,6 +22,7 @@
from __future__ import absolute_import
from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.pipeline import PTransformOverride
@@ -329,3 +330,47 @@ class WriteToBigQueryPTransformOverride(PTransformOverride):
return {key: out for key in self.outputs}
return WriteToBigQuery(ptransform, self.outputs)
+
+
+class GroupIntoBatchesWithShardedKeyPTransformOverride(PTransformOverride):
+ """A ``PTransformOverride`` for ``GroupIntoBatches.WithShardedKey``.
+
+ This override simply returns the original transform but additionally records
+ the output PCollection in order to append required step properties during
+ graph translation.
+ """
+ def __init__(self, dataflow_runner, options):
+ self.dataflow_runner = dataflow_runner
+ self.options = options
+
+ def matches(self, applied_ptransform):
+ # Imported here to avoid circular dependencies.
+ # pylint: disable=wrong-import-order, wrong-import-position
+ from apache_beam import util
+
+ transform = applied_ptransform.transform
+
+ if not isinstance(transform, util.GroupIntoBatches.WithShardedKey):
+ return False
+
+ # The replacement is only valid for portable Streaming Engine jobs with
+ # runner v2.
+ standard_options = self.options.view_as(StandardOptions)
+ if not standard_options.streaming:
+ return False
+ google_cloud_options = self.options.view_as(GoogleCloudOptions)
+ if not google_cloud_options.enable_streaming_engine:
+ return False
+
+ from apache_beam.runners.dataflow.internal import apiclient
+ if not apiclient._use_unified_worker(self.options):
+ return False
+ experiments = self.options.view_as(DebugOptions).experiments or []
+ if 'enable_streaming_auto_sharding' not in experiments:
+ return False
+
+ self.dataflow_runner.add_pcoll_with_auto_sharding(applied_ptransform)
+ return True
+
+ def get_replacement_transform_for_applied_ptransform(self, ptransform):
+ return ptransform.transform
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 85681f4..2e8d7c9 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -50,6 +50,7 @@ from apache_beam import coders
from apache_beam import typehints
from apache_beam.metrics import Metrics
from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.transforms import window
from apache_beam.transforms.combiners import CountCombineFn
from apache_beam.transforms.core import CombinePerKey
@@ -763,15 +764,12 @@ class GroupIntoBatches(PTransform):
max_buffering_duration_secs: (optional) How long in seconds at most an
incomplete batch of elements is allowed to be buffered in the states.
The duration must be a positive second duration and should be given as
- an int or float.
+ an int or float. Setting this parameter to zero effectively means no
+ buffering limit.
clock: (optional) an alternative to time.time (mostly for testing)
"""
- self.batch_size = batch_size
-
- if max_buffering_duration_secs is not None:
- assert max_buffering_duration_secs > 0, (
- 'max buffering duration should be a positive value')
- self.max_buffering_duration_secs = max_buffering_duration_secs
+ self.params = _GroupIntoBatchesParams(
+ batch_size, max_buffering_duration_secs)
self.clock = clock
def expand(self, pcoll):
@@ -779,11 +777,25 @@ class GroupIntoBatches(PTransform):
return pcoll | ParDo(
_pardo_group_into_batches(
input_coder,
- self.batch_size,
- self.max_buffering_duration_secs,
+ self.params.batch_size,
+ self.params.max_buffering_duration_secs,
self.clock))
- @experimental()
+ def to_runner_api_parameter(
+ self,
+ unused_context # type: PipelineContext
+ ): # type: (...) -> Tuple[str, beam_runner_api_pb2.GroupIntoBatchesPayload]
+ return (
+ common_urns.group_into_batches_components.GROUP_INTO_BATCHES.urn,
+ self.params.get_payload())
+
+ @staticmethod
+ @PTransform.register_urn(
+ common_urns.group_into_batches_components.GROUP_INTO_BATCHES.urn,
+ beam_runner_api_pb2.GroupIntoBatchesPayload)
+ def from_runner_api_parameter(unused_ptransform, proto, unused_context):
+ return GroupIntoBatches(*_GroupIntoBatchesParams.parse_payload(proto))
+
@typehints.with_input_types(Tuple[K, V])
@typehints.with_output_types(Tuple[K, Iterable[V]])
class WithShardedKey(PTransform):
@@ -796,21 +808,11 @@ class GroupIntoBatches(PTransform):
execution time.
"""
def __init__(self, batch_size, max_buffering_duration_secs=None):
- """Create a new GroupIntoBatches.WithShardedKey.
-
- Arguments:
- batch_size: (required) How many elements should be in a batch
- max_buffering_duration_secs: (optional) How long in seconds at most an
- incomplete batch of elements is allowed to be buffered in the states.
- The duration must be a positive second duration and should be given as
- an int or float.
+ """Create a new GroupIntoBatches with sharded output.
+ See ``GroupIntoBatches`` transform for a description of input parameters.
"""
- self.batch_size = batch_size
-
- if max_buffering_duration_secs is not None:
- assert max_buffering_duration_secs > 0, (
- 'max buffering duration should be a positive value')
- self.max_buffering_duration_secs = max_buffering_duration_secs
+ self.params = _GroupIntoBatchesParams(
+ batch_size, max_buffering_duration_secs)
_shard_id_prefix = uuid.uuid4().bytes
@@ -825,7 +827,64 @@ class GroupIntoBatches(PTransform):
key_value[1]))
return (
sharded_pcoll
- | GroupIntoBatches(self.batch_size, self.max_buffering_duration_secs))
+ | GroupIntoBatches(
+ self.params.batch_size, self.params.max_buffering_duration_secs))
+
+ def to_runner_api_parameter(
+ self,
+ unused_context # type: PipelineContext
+ ): # type: (...) -> Tuple[str, beam_runner_api_pb2.GroupIntoBatchesPayload]
+ return (
+ common_urns.composites.GROUP_INTO_BATCHES_WITH_SHARDED_KEY.urn,
+ self.params.get_payload())
+
+ @staticmethod
+ @PTransform.register_urn(
+ common_urns.composites.GROUP_INTO_BATCHES_WITH_SHARDED_KEY.urn,
+ beam_runner_api_pb2.GroupIntoBatchesPayload)
+ def from_runner_api_parameter(unused_ptransform, proto, unused_context):
+ return GroupIntoBatches.WithShardedKey(
+ *_GroupIntoBatchesParams.parse_payload(proto))
+
+
+class _GroupIntoBatchesParams:
+ """This class represents the parameters for
+ :class:`apache_beam.utils.GroupIntoBatches` transform, used to define how
+ elements should be batched.
+ """
+ def __init__(self, batch_size, max_buffering_duration_secs):
+ self.batch_size = batch_size
+ self.max_buffering_duration_secs = (
+ 0
+ if max_buffering_duration_secs is None else max_buffering_duration_secs)
+ self._validate()
+
+ def __eq__(self, other):
+ if other is None or not isinstance(other, _GroupIntoBatchesParams):
+ return False
+ return (
+ self.batch_size == other.batch_size and
+ self.max_buffering_duration_secs == other.max_buffering_duration_secs)
+
+ def _validate(self):
+ assert self.batch_size is not None and self.batch_size > 0, (
+ 'batch_size must be a positive value')
+ assert (
+ self.max_buffering_duration_secs is not None and
+ self.max_buffering_duration_secs >= 0), (
+ 'max_buffering_duration must be a non-negative value')
+
+ def get_payload(self):
+ return beam_runner_api_pb2.GroupIntoBatchesPayload(
+ batch_size=self.batch_size,
+ max_buffering_duration_millis=int(
+ self.max_buffering_duration_secs * 1000))
+
+ @staticmethod
+ def parse_payload(
+ proto # type: beam_runner_api_pb2.GroupIntoBatchesPayload
+ ):
+ return proto.batch_size, proto.max_buffering_duration_millis / 1000
def _pardo_group_into_batches(
@@ -850,7 +909,7 @@ def _pardo_group_into_batches(
element_state.add(element)
count_state.add(1)
count = count_state.read()
- if count == 1 and max_buffering_duration_secs is not None:
+ if count == 1 and max_buffering_duration_secs > 0:
# This is the first element in batch. Start counting buffering time if a
# limit was set.
buffering_timer.set(clock() + max_buffering_duration_secs)
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index 26dcdb1..0022130 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -43,6 +43,9 @@ from apache_beam import WindowInto
from apache_beam.coders import coders
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners import pipeline_context
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import TestWindowedValue
@@ -62,6 +65,7 @@ from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import Sessions
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import proto_utils
from apache_beam.utils import timestamp
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
@@ -773,6 +777,43 @@ class GroupIntoBatchesTest(unittest.TestCase):
# the global window ends.
assert_that(num_elements_per_batch, equal_to([9, 1]))
+ def _test_runner_api_round_trip(self, transform, urn):
+ context = pipeline_context.PipelineContext()
+ proto = transform.to_runner_api(context)
+ self.assertEqual(urn, proto.urn)
+ payload = (
+ proto_utils.parse_Bytes(
+ proto.payload, beam_runner_api_pb2.GroupIntoBatchesPayload))
+ self.assertEqual(transform.params.batch_size, payload.batch_size)
+ self.assertEqual(
+ transform.params.max_buffering_duration_secs * 1000,
+ payload.max_buffering_duration_millis)
+
+ transform_from_proto = (
+ transform.__class__.from_runner_api_parameter(None, payload, None))
+ self.assertTrue(isinstance(transform_from_proto, transform.__class__))
+ self.assertEqual(transform.params, transform_from_proto.params)
+
+ def test_runner_api(self):
+ batch_size = 10
+ max_buffering_duration_secs = [None, 0, 5]
+
+ for duration in max_buffering_duration_secs:
+ self._test_runner_api_round_trip(
+ util.GroupIntoBatches(batch_size, duration),
+ common_urns.group_into_batches_components.GROUP_INTO_BATCHES.urn)
+ self._test_runner_api_round_trip(
+ util.GroupIntoBatches(batch_size),
+ common_urns.group_into_batches_components.GROUP_INTO_BATCHES.urn)
+
+ for duration in max_buffering_duration_secs:
+ self._test_runner_api_round_trip(
+ util.GroupIntoBatches.WithShardedKey(batch_size, duration),
+ common_urns.composites.GROUP_INTO_BATCHES_WITH_SHARDED_KEY.urn)
+ self._test_runner_api_round_trip(
+ util.GroupIntoBatches.WithShardedKey(batch_size),
+ common_urns.composites.GROUP_INTO_BATCHES_WITH_SHARDED_KEY.urn)
+
class ToStringTest(unittest.TestCase):
def test_tostring_elements(self):