You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2020/01/21 21:43:20 UTC

[beam] 01/01: Revert "[BEAM-9122] Add uses_keyed_state step property in python dataflow run… (#10596)"

This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch revert-10596-uses_keyed_state
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2269a5015dfb0bb3b1f8469d06f1ab622f85a3ab
Author: Ankur <an...@users.noreply.github.com>
AuthorDate: Tue Jan 21 13:43:04 2020 -0800

    Revert "[BEAM-9122] Add uses_keyed_state step property in python dataflow run… (#10596)"
    
    This reverts commit 52b478eeccfd19bfc05fecd519f7dc54db8c67eb.
---
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 7 ++-----
 sdks/python/apache_beam/runners/dataflow/internal/names.py  | 1 -
 2 files changed, 2 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ada700c..69b1fb8 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -52,7 +52,6 @@ from apache_beam.options.pipeline_options import TestOptions
 from apache_beam.options.pipeline_options import WorkerOptions
 from apache_beam.portability import common_urns
 from apache_beam.pvalue import AsSideInput
-from apache_beam.runners.common import DoFnSignature
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
 from apache_beam.runners.dataflow.internal.names import PropertyNames
@@ -953,10 +952,6 @@ class DataflowRunner(PipelineRunner):
       step.add_property(PropertyNames.RESTRICTION_ENCODING,
                         self._get_cloud_encoding(restriction_coder))
 
-    if options.view_as(StandardOptions).streaming and DoFnSignature(
-        transform.dofn).is_stateful_dofn():
-      step.add_property(PropertyNames.USES_KEYED_STATE, "true")
-
   @staticmethod
   def _pardo_fn_data(transform_node, get_label):
     transform = transform_node.transform
@@ -1134,6 +1129,7 @@ class DataflowRunner(PipelineRunner):
         coders.registry.get_coder(transform_node.outputs[None].element_type),
         coders.coders.GlobalWindowCoder())
 
+    from apache_beam.runners.dataflow.internal import apiclient
     step.encoding = self._get_cloud_encoding(coder)
     step.add_property(
         PropertyNames.OUTPUT_INFO,
@@ -1219,6 +1215,7 @@ class DataflowRunner(PipelineRunner):
     # correct coder.
     coder = coders.WindowedValueCoder(transform.sink.coder,
                                       coders.coders.GlobalWindowCoder())
+    from apache_beam.runners.dataflow.internal import apiclient
     step.encoding = self._get_cloud_encoding(coder)
     step.add_property(PropertyNames.ENCODING, step.encoding)
     step.add_property(
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index e9b34d4..7bc0295 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -123,7 +123,6 @@ class PropertyNames(object):
   USE_INDEXED_FORMAT = 'use_indexed_format'
   USER_FN = 'user_fn'
   USER_NAME = 'user_name'
-  USES_KEYED_STATE = 'uses_keyed_state'
   VALIDATE_SINK = 'validate_sink'
   VALIDATE_SOURCE = 'validate_source'
   VALUE = 'value'