You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/02 20:09:10 UTC
[beam] branch master updated: Allow inference of
CombiningValueState coders.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6d4771b Allow inference of CombiningValueState coders.
new a434c9e Merge pull request #8402 from robertwb/infer-state-coder
6d4771b is described below
commit 6d4771b7cdac7e6b0a6cdcca000fd4bcf455bbef
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Thu Apr 25 16:39:07 2019 +0200
Allow inference of CombiningValueState coders.
---
.../runners/portability/fn_api_runner_test.py | 3 +--
sdks/python/apache_beam/transforms/userstate.py | 18 ++++++++++++++++--
2 files changed, 17 insertions(+), 4 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 5049ff4..8646ef2 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -255,8 +255,7 @@ class FnApiRunnerTest(unittest.TestCase):
equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))
def test_pardo_state_only(self):
- index_state_spec = userstate.CombiningValueStateSpec(
- 'index', beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum)
+ index_state_spec = userstate.CombiningValueStateSpec('index', sum)
# TODO(ccy): State isn't detected with Map/FlatMap.
class AddIndex(beam.DoFn):
diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py
index 891417b..6d98ec4 100644
--- a/sdks/python/apache_beam/transforms/userstate.py
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -63,16 +63,30 @@ class BagStateSpec(StateSpec):
class CombiningValueStateSpec(StateSpec):
"""Specification for a user DoFn combining value state cell."""
- def __init__(self, name, coder, combine_fn):
+ def __init__(self, name, coder=None, combine_fn=None):
# Avoid circular import.
from apache_beam.transforms.core import CombineFn
+ # We want the coder to be optional, but unfortunately it comes
+ # before the non-optional combine_fn parameter, which we can't
+ # change for backwards compatibility reasons.
+ #
+ # Instead, allow it to be omitted (by either passing two arguments
+ # or combine_fn by keyword.)
+ if combine_fn is None:
+ if coder is None:
+ raise ValueError('combine_fn must be provided')
+ else:
+ coder, combine_fn = None, coder
+ self.combine_fn = CombineFn.maybe_from_callable(combine_fn)
+ if coder is None:
+ coder = self.combine_fn.get_accumulator_coder()
+
assert isinstance(name, str)
assert isinstance(coder, Coder)
self.name = name
# The coder here should be for the accumulator type of the given CombineFn.
self.coder = coder
- self.combine_fn = CombineFn.maybe_from_callable(combine_fn)
def to_runner_api(self, context):
return beam_runner_api_pb2.StateSpec(