You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2019/12/02 23:36:42 UTC

[beam] 03/03: [BEAM-2929] Ensure that the Beam Python SDK sends the property "use_indexed_format" to Dataflow for side inputs which use a multimap materialization.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 9fc641ac8f7c1385c82e71e35d55c1d4d77a1147
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Nov 27 12:56:59 2019 -0800

    [BEAM-2929] Ensure that the Beam Python SDK sends the property "use_indexed_format" to Dataflow for side inputs which use a multimap materialization.
---
 .../runners/dataflow/dataflow_runner.py            | 22 ++++++++++++++--------
 .../apache_beam/runners/dataflow/internal/names.py |  1 +
 2 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index d07abc4..718ab61 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -607,7 +607,8 @@ class DataflowRunner(PipelineRunner):
     return step
 
   def _add_singleton_step(
-      self, label, full_label, tag, input_step, windowing_strategy):
+      self, label, full_label, tag, input_step, windowing_strategy,
+      access_pattern):
     """Creates a CollectionToSingleton step used to handle ParDo side inputs."""
     # Import here to avoid adding the dependency for local running scenarios.
     from apache_beam.runners.dataflow.internal import apiclient
@@ -620,12 +621,16 @@ class DataflowRunner(PipelineRunner):
          PropertyNames.STEP_NAME: input_step.proto.name,
          PropertyNames.OUTPUT_NAME: input_step.get_output(tag)})
     step.encoding = self._get_side_input_encoding(input_step.encoding)
-    step.add_property(
-        PropertyNames.OUTPUT_INFO,
-        [{PropertyNames.USER_NAME: (
-            '%s.%s' % (full_label, PropertyNames.OUTPUT)),
-          PropertyNames.ENCODING: step.encoding,
-          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+
+    output_info = {
+        PropertyNames.USER_NAME: '%s.%s' % (full_label, PropertyNames.OUTPUT),
+        PropertyNames.ENCODING: step.encoding,
+        PropertyNames.OUTPUT_NAME: PropertyNames.OUT
+    }
+    if common_urns.side_inputs.MULTIMAP.urn == access_pattern:
+      output_info[PropertyNames.USE_INDEXED_FORMAT] = True
+    step.add_property(PropertyNames.OUTPUT_INFO, [output_info])
+
     step.add_property(
         PropertyNames.WINDOWING_STRATEGY,
         self.serialize_windowing_strategy(windowing_strategy))
@@ -820,7 +825,8 @@ class DataflowRunner(PipelineRunner):
       self._add_singleton_step(
           step_name, si_full_label, side_pval.pvalue.tag,
           self._cache.get_pvalue(side_pval.pvalue),
-          side_pval.pvalue.windowing)
+          side_pval.pvalue.windowing,
+          side_pval._side_input_data().access_pattern)
       si_dict[si_label] = {
           '@type': 'OutputReference',
           PropertyNames.STEP_NAME: step_name,
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 5b2dd89..fdce49b 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -117,6 +117,7 @@ class PropertyNames(object):
   SOURCE_STEP_INPUT = 'custom_source_step_input'
   SERIALIZED_TEST_STREAM = 'serialized_test_stream'
   STEP_NAME = 'step_name'
+  USE_INDEXED_FORMAT = 'use_indexed_format'
   USER_FN = 'user_fn'
   USER_NAME = 'user_name'
   VALIDATE_SINK = 'validate_sink'