You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2020/01/21 21:43:20 UTC
[beam] 01/01: Revert "[BEAM-9122] Add uses_keyed_state step property in python dataflow run… (#10596)"
This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch revert-10596-uses_keyed_state
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2269a5015dfb0bb3b1f8469d06f1ab622f85a3ab
Author: Ankur <an...@users.noreply.github.com>
AuthorDate: Tue Jan 21 13:43:04 2020 -0800
Revert "[BEAM-9122] Add uses_keyed_state step property in python dataflow run… (#10596)"
This reverts commit 52b478eeccfd19bfc05fecd519f7dc54db8c67eb.
---
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 7 ++-----
sdks/python/apache_beam/runners/dataflow/internal/names.py | 1 -
2 files changed, 2 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ada700c..69b1fb8 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -52,7 +52,6 @@ from apache_beam.options.pipeline_options import TestOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.portability import common_urns
from apache_beam.pvalue import AsSideInput
-from apache_beam.runners.common import DoFnSignature
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
from apache_beam.runners.dataflow.internal.names import PropertyNames
@@ -953,10 +952,6 @@ class DataflowRunner(PipelineRunner):
step.add_property(PropertyNames.RESTRICTION_ENCODING,
self._get_cloud_encoding(restriction_coder))
- if options.view_as(StandardOptions).streaming and DoFnSignature(
- transform.dofn).is_stateful_dofn():
- step.add_property(PropertyNames.USES_KEYED_STATE, "true")
-
@staticmethod
def _pardo_fn_data(transform_node, get_label):
transform = transform_node.transform
@@ -1134,6 +1129,7 @@ class DataflowRunner(PipelineRunner):
coders.registry.get_coder(transform_node.outputs[None].element_type),
coders.coders.GlobalWindowCoder())
+ from apache_beam.runners.dataflow.internal import apiclient
step.encoding = self._get_cloud_encoding(coder)
step.add_property(
PropertyNames.OUTPUT_INFO,
@@ -1219,6 +1215,7 @@ class DataflowRunner(PipelineRunner):
# correct coder.
coder = coders.WindowedValueCoder(transform.sink.coder,
coders.coders.GlobalWindowCoder())
+ from apache_beam.runners.dataflow.internal import apiclient
step.encoding = self._get_cloud_encoding(coder)
step.add_property(PropertyNames.ENCODING, step.encoding)
step.add_property(
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index e9b34d4..7bc0295 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -123,7 +123,6 @@ class PropertyNames(object):
USE_INDEXED_FORMAT = 'use_indexed_format'
USER_FN = 'user_fn'
USER_NAME = 'user_name'
- USES_KEYED_STATE = 'uses_keyed_state'
VALIDATE_SINK = 'validate_sink'
VALIDATE_SOURCE = 'validate_source'
VALUE = 'value'