You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/02 20:09:10 UTC

[beam] branch master updated: Allow inference of CombiningValueState coders.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d4771b  Allow inference of CombiningValueState coders.
     new a434c9e  Merge pull request #8402 from robertwb/infer-state-coder
6d4771b is described below

commit 6d4771b7cdac7e6b0a6cdcca000fd4bcf455bbef
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Thu Apr 25 16:39:07 2019 +0200

    Allow inference of CombiningValueState coders.
---
 .../runners/portability/fn_api_runner_test.py          |  3 +--
 sdks/python/apache_beam/transforms/userstate.py        | 18 ++++++++++++++++--
 2 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 5049ff4..8646ef2 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -255,8 +255,7 @@ class FnApiRunnerTest(unittest.TestCase):
           equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))
 
   def test_pardo_state_only(self):
-    index_state_spec = userstate.CombiningValueStateSpec(
-        'index', beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum)
+    index_state_spec = userstate.CombiningValueStateSpec('index', sum)
 
     # TODO(ccy): State isn't detected with Map/FlatMap.
     class AddIndex(beam.DoFn):
diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py
index 891417b..6d98ec4 100644
--- a/sdks/python/apache_beam/transforms/userstate.py
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -63,16 +63,30 @@ class BagStateSpec(StateSpec):
 class CombiningValueStateSpec(StateSpec):
   """Specification for a user DoFn combining value state cell."""
 
-  def __init__(self, name, coder, combine_fn):
+  def __init__(self, name, coder=None, combine_fn=None):
     # Avoid circular import.
     from apache_beam.transforms.core import CombineFn
 
+    # We want the coder to be optional, but unfortunately it comes
+    # before the non-optional combine_fn parameter, which we can't
+    # change for backwards compatibility reasons.
+    #
+    # Instead, allow it to be omitted (by either passing two arguments
+    # or combine_fn by keyword.)
+    if combine_fn is None:
+      if coder is None:
+        raise ValueError('combine_fn must be provided')
+      else:
+        coder, combine_fn = None, coder
+    self.combine_fn = CombineFn.maybe_from_callable(combine_fn)
+    if coder is None:
+      coder = self.combine_fn.get_accumulator_coder()
+
     assert isinstance(name, str)
     assert isinstance(coder, Coder)
     self.name = name
     # The coder here should be for the accumulator type of the given CombineFn.
     self.coder = coder
-    self.combine_fn = CombineFn.maybe_from_callable(combine_fn)
 
   def to_runner_api(self, context):
     return beam_runner_api_pb2.StateSpec(