You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2019/12/02 23:36:42 UTC
[beam] 03/03: [BEAM-2929] Ensure that the Beam Python SDK sends the
property "use_indexed_format" to Dataflow for side inputs which use a
multimap materialization.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 9fc641ac8f7c1385c82e71e35d55c1d4d77a1147
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Nov 27 12:56:59 2019 -0800
[BEAM-2929] Ensure that the Beam Python SDK sends the property "use_indexed_format" to Dataflow for side inputs which use a multimap materialization.
---
.../runners/dataflow/dataflow_runner.py | 22 ++++++++++++++--------
.../apache_beam/runners/dataflow/internal/names.py | 1 +
2 files changed, 15 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index d07abc4..718ab61 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -607,7 +607,8 @@ class DataflowRunner(PipelineRunner):
return step
def _add_singleton_step(
- self, label, full_label, tag, input_step, windowing_strategy):
+ self, label, full_label, tag, input_step, windowing_strategy,
+ access_pattern):
"""Creates a CollectionToSingleton step used to handle ParDo side inputs."""
# Import here to avoid adding the dependency for local running scenarios.
from apache_beam.runners.dataflow.internal import apiclient
@@ -620,12 +621,16 @@ class DataflowRunner(PipelineRunner):
PropertyNames.STEP_NAME: input_step.proto.name,
PropertyNames.OUTPUT_NAME: input_step.get_output(tag)})
step.encoding = self._get_side_input_encoding(input_step.encoding)
- step.add_property(
- PropertyNames.OUTPUT_INFO,
- [{PropertyNames.USER_NAME: (
- '%s.%s' % (full_label, PropertyNames.OUTPUT)),
- PropertyNames.ENCODING: step.encoding,
- PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+
+ output_info = {
+ PropertyNames.USER_NAME: '%s.%s' % (full_label, PropertyNames.OUTPUT),
+ PropertyNames.ENCODING: step.encoding,
+ PropertyNames.OUTPUT_NAME: PropertyNames.OUT
+ }
+ if common_urns.side_inputs.MULTIMAP.urn == access_pattern:
+ output_info[PropertyNames.USE_INDEXED_FORMAT] = True
+ step.add_property(PropertyNames.OUTPUT_INFO, [output_info])
+
step.add_property(
PropertyNames.WINDOWING_STRATEGY,
self.serialize_windowing_strategy(windowing_strategy))
@@ -820,7 +825,8 @@ class DataflowRunner(PipelineRunner):
self._add_singleton_step(
step_name, si_full_label, side_pval.pvalue.tag,
self._cache.get_pvalue(side_pval.pvalue),
- side_pval.pvalue.windowing)
+ side_pval.pvalue.windowing,
+ side_pval._side_input_data().access_pattern)
si_dict[si_label] = {
'@type': 'OutputReference',
PropertyNames.STEP_NAME: step_name,
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 5b2dd89..fdce49b 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -117,6 +117,7 @@ class PropertyNames(object):
SOURCE_STEP_INPUT = 'custom_source_step_input'
SERIALIZED_TEST_STREAM = 'serialized_test_stream'
STEP_NAME = 'step_name'
+ USE_INDEXED_FORMAT = 'use_indexed_format'
USER_FN = 'user_fn'
USER_NAME = 'user_name'
VALIDATE_SINK = 'validate_sink'