You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/10 00:06:32 UTC

[GitHub] [beam] robertwb opened a new pull request #11963: Add relational GroupBy transform to Python.

robertwb opened a new pull request #11963:
URL: https://github.com/apache/beam/pull/11963


   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11963: Add relational GroupBy transform to Python.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11963:
URL: https://github.com/apache/beam/pull/11963#discussion_r453137842



##########
File path: sdks/python/apache_beam/typehints/native_type_compatibility.py
##########
@@ -85,7 +85,7 @@ def _safe_issubclass(derived, parent):
   """
   try:
     return issubclass(derived, parent)
-  except TypeError:
+  except (TypeError, AttributeError):

Review comment:
       I'm pretty sure I did see this. (Maybe it was missing `__mro__` or something?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #11963: Add relational GroupBy transform to Python.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #11963:
URL: https://github.com/apache/beam/pull/11963#issuecomment-657688336


   I've refactored this to use named tuples rather than rows. PTAL.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #11963: Add relational GroupBy transform to Python.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #11963:
URL: https://github.com/apache/beam/pull/11963#issuecomment-655215071


   R: @TheNeuralBit This is ready for review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #11963: Add relational GroupBy transform to Python.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11963:
URL: https://github.com/apache/beam/pull/11963#discussion_r459031897



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -2247,6 +2249,166 @@ def runner_api_requires_keyed_input(self):
     return True
 
 
+def _expr_to_callable(expr, pos):
+  if isinstance(expr, str):
+    return lambda x: getattr(x, expr)
+  elif callable(expr):
+    return expr
+  else:
+    raise TypeError(
+        'Field expression %r at %s must be a callable or a string.' %
+        (expr, pos))
+
+
+class GroupBy(PTransform):
+  """Groups a PCollection by one or more expressions, used to derive the key.
+
+  `GroupBy(expr)` is roughly equivalent to
+
+      beam.Map(lambda v: (expr(v), v)) | beam.GroupByKey()
+
+  but provides several conviniences, e.g.

Review comment:
       ```suggestion
     but provides several conveniences, e.g.
   ```

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -87,7 +87,10 @@ def from_runner_api_parameter(schema, components, unused_context):
   @staticmethod
   def from_type_hint(type_hint, registry):
     if isinstance(type_hint, row_type.RowTypeConstraint):
-      schema = named_fields_to_schema(type_hint._fields)
+      try:
+        schema = named_fields_to_schema(type_hint._fields)
+      except ValueError:
+        return typecoders.registry.get_coder(object)

Review comment:
       Ah I see. I'd rather not have this escape hatch if we can avoid it, but I can imagine it would be frustrating for users who don't care about using Rows xlang if we just refuse to make a RowCoder for them. Perhaps we should make `named_fields_to_schema` wrap unknown type hints in a "pythonsdk" logical type that uses fast primitives coder. Then we can give a better error message if/when a xlang issue arises. If you just add a TODO here I can take that action.
   
   When running locally I noticed that this is only needed for your tests because we can't infer a type for `sign=x // abs(x) if x else 0` and fallback to `Any`. Just curious - is it possible to fix the inference for that function? It seems like we should be able to.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11963: Add relational GroupBy transform to Python.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11963:
URL: https://github.com/apache/beam/pull/11963#discussion_r453137898



##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -87,7 +87,10 @@ def from_runner_api_parameter(schema, components, unused_context):
   @staticmethod
   def from_type_hint(type_hint, registry):
     if isinstance(type_hint, row_type.RowTypeConstraint):
-      schema = named_fields_to_schema(type_hint._fields)
+      try:
+        schema = named_fields_to_schema(type_hint._fields)
+      except ValueError:
+        return typecoders.registry.get_coder(object)

Review comment:
       Well, it's also there to handle the case where we simply don't know what the type is. I'll see if I can re-work this to use named tuples (thanks for the idea about `__reduce__`). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #11963: Add relational GroupBy transform to Python.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #11963:
URL: https://github.com/apache/beam/pull/11963#issuecomment-663254641


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #11963: Add relational GroupBy transform to Python.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #11963:
URL: https://github.com/apache/beam/pull/11963#issuecomment-663252987


   Thanks. Filed a bug and added a TODO about a Python SDK logical type. I'll look at the inference separately. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #11963: Add relational GroupBy transform to Python.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11963:
URL: https://github.com/apache/beam/pull/11963#discussion_r453052127



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -2247,6 +2248,154 @@ def runner_api_requires_keyed_input(self):
     return True
 
 
+def _expr_to_callable(expr, pos):
+  if isinstance(expr, str):
+    return lambda x: getattr(x, expr)
+  elif callable(expr):
+    return expr
+  else:
+    raise TypeError(
+        'Field expression %r at %s must be a callable or a string.' %
+        (expr, pos))
+
+
+class GroupBy(PTransform):
+  """Groups a PCollection by one or more expressions, used to derive the key.
+
+  `GroupBy(expr)` is roughly equivalent to
+
+      beam.Map(lambda v: (expr(v), v)) | GroupByKey()

Review comment:
       ```suggestion
         beam.Map(lambda v: (expr(v), v)) | beam.GroupByKey()
   ```

##########
File path: sdks/python/apache_beam/typehints/native_type_compatibility.py
##########
@@ -85,7 +85,7 @@ def _safe_issubclass(derived, parent):
   """
   try:
     return issubclass(derived, parent)
-  except TypeError:
+  except (TypeError, AttributeError):

Review comment:
       Will issubclass actually raise an AttributeError? Curious what causes this

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -87,7 +87,10 @@ def from_runner_api_parameter(schema, components, unused_context):
   @staticmethod
   def from_type_hint(type_hint, registry):
     if isinstance(type_hint, row_type.RowTypeConstraint):
-      schema = named_fields_to_schema(type_hint._fields)
+      try:
+        schema = named_fields_to_schema(type_hint._fields)
+      except ValueError:
+        return typecoders.registry.get_coder(object)

Review comment:
       Is this a fallback for when a Row uses types that we don't support in Python schemas?
   
   I worry about this since it makes it tricky for a user to tell when a PCollection can be used in an ExternalTransform that uses rows. Are there some specific types that we need to add coverage for?

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -2247,6 +2248,154 @@ def runner_api_requires_keyed_input(self):
     return True
 
 
+def _expr_to_callable(expr, pos):
+  if isinstance(expr, str):
+    return lambda x: getattr(x, expr)
+  elif callable(expr):
+    return expr
+  else:
+    raise TypeError(
+        'Field expression %r at %s must be a callable or a string.' %
+        (expr, pos))
+
+
+class GroupBy(PTransform):
+  """Groups a PCollection by one or more expressions, used to derive the key.
+
+  `GroupBy(expr)` is roughly equivalent to
+
+      beam.Map(lambda v: (expr(v), v)) | GroupByKey()
+
+  but provides several conviniences, e.g.
+
+      * Several arguments may be provided, as positional or keyword arguments,
+        resulting in a tuple-like key. For example `GroupBy(a=expr1, b=expr2)`
+        groups by a key with attributes `a` and `b` computed by applying
+        `expr1` and `expr2` to each element.
+
+      * Strings can be used as a shorthand for accessing an attribute, e.g.
+        `GroupBy('some_field')` is equivalent to
+        `GroupBy(lambda v: getattr(v, 'some_field'))`.
+
+  The GroupBy operation can be made into an aggregating operation by invoking
+  its `aggregate_field` method.
+  """
+  def __init__(
+      self,
+      *fields,  # type: typing.Union[str, callable]
+      **kwargs  # type: typing.Union[str, callable]
+    ):
+    if len(fields) == 1 and not kwargs:
+      self._force_tuple_keys = False
+      name = fields[0] if isinstance(fields[0], str) else 'key'
+      key_fields = [(name, _expr_to_callable(fields[0], 0))]
+    else:
+      self._force_tuple_keys = True
+      key_fields = []
+      for ix, field in enumerate(fields):
+        name = field if isinstance(field, str) else 'key%d' % ix
+        key_fields.append((name, _expr_to_callable(field, ix)))
+      for name, expr in kwargs.items():
+        key_fields.append((name, _expr_to_callable(expr, name)))
+    self._key_fields = key_fields
+    # TODO(robertwb): Pickling of dynamic named tuples.
+    # self._key_type = typing.NamedTuple(
+    #     'Key', [name for name, _ in self._key_fields])

Review comment:
       I had this problem with the NamedTuple used in an external transform's output PCollection, I solved it there by making a custom `__reduce__` function that rebuilds from the proto-encoded schema:
   https://github.com/apache/beam/blob/80de47655c90594eead2f51eece206e8c4fe0227/sdks/python/apache_beam/typehints/schemas.py#L210-L225
   
   (A TODO is fine as well, just wanted to point that out in case it's helpful)

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -2247,6 +2248,154 @@ def runner_api_requires_keyed_input(self):
     return True
 
 
+def _expr_to_callable(expr, pos):
+  if isinstance(expr, str):
+    return lambda x: getattr(x, expr)
+  elif callable(expr):
+    return expr
+  else:
+    raise TypeError(
+        'Field expression %r at %s must be a callable or a string.' %
+        (expr, pos))
+
+
+class GroupBy(PTransform):
+  """Groups a PCollection by one or more expressions, used to derive the key.
+
+  `GroupBy(expr)` is roughly equivalent to
+
+      beam.Map(lambda v: (expr(v), v)) | GroupByKey()
+
+  but provides several conviniences, e.g.
+
+      * Several arguments may be provided, as positional or keyword arguments,
+        resulting in a tuple-like key. For example `GroupBy(a=expr1, b=expr2)`
+        groups by a key with attributes `a` and `b` computed by applying
+        `expr1` and `expr2` to each element.
+
+      * Strings can be used as a shorthand for accessing an attribute, e.g.
+        `GroupBy('some_field')` is equivalent to
+        `GroupBy(lambda v: getattr(v, 'some_field'))`.
+
+  The GroupBy operation can be made into an aggregating operation by invoking
+  its `aggregate_field` method.
+  """
+  def __init__(
+      self,
+      *fields,  # type: typing.Union[str, callable]
+      **kwargs  # type: typing.Union[str, callable]
+    ):
+    if len(fields) == 1 and not kwargs:
+      self._force_tuple_keys = False
+      name = fields[0] if isinstance(fields[0], str) else 'key'
+      key_fields = [(name, _expr_to_callable(fields[0], 0))]
+    else:
+      self._force_tuple_keys = True
+      key_fields = []
+      for ix, field in enumerate(fields):
+        name = field if isinstance(field, str) else 'key%d' % ix
+        key_fields.append((name, _expr_to_callable(field, ix)))
+      for name, expr in kwargs.items():
+        key_fields.append((name, _expr_to_callable(expr, name)))
+    self._key_fields = key_fields
+    # TODO(robertwb): Pickling of dynamic named tuples.
+    # self._key_type = typing.NamedTuple(
+    #     'Key', [name for name, _ in self._key_fields])
+    self._key_type = lambda *values: pvalue.Row(
+        **{name: value
+           for (name, _), value in zip(self._key_fields, values)})
+
+  def aggregate_field(
+      self,
+      field,  # type: typing.Union[str, callable]
+      combine_fn,  # type: typing.Union[callable, CombineFn]
+      dest,  # type: str
+    ):
+    """Returns a grouping operation that also aggregates grouped values.
+
+    Args:
+      field: indicates the field to be aggregated
+      combine_fn: indicates the aggregation function to be used
+      dest: indicates the name that will be used for the aggregate in the output
+
+    May be called repeatedly to aggregate multiple fields, e.g.
+
+        GroupBy('key')
+            .aggregate_field('some_attr', sum, 'sum_attr')
+            .aggregate_field(lambda v: ..., MeanCombineFn, 'mean')
+    """
+    return _GroupAndAggregate(self, ()).aggregate_field(field, combine_fn, dest)
+
+  def force_tuple_keys(self, value=True):
+    """Forces the keys to always be tuple-like, even if there is only a single
+    expression.
+    """
+    res = copy.copy(self)
+    res._force_tuple_keys = value
+    return res
+
+  def _key_func(self):
+    if not self._force_tuple_keys and len(self._key_fields) == 1:
+      return self._key_fields[0][1]
+    else:
+      key_type = self._key_type
+      key_exprs = [expr for _, expr in self._key_fields]
+      return lambda element: key_type(*(expr(element) for expr in key_exprs))
+
+  def default_label(self):
+    return 'GroupBy(%s)' % ', '.join(name for name, _ in self._key_fields)
+
+  def expand(self, pcoll):
+    return pcoll | Map(lambda x: (self._key_func()(x), x)) | GroupByKey()
+
+
+class _GroupAndAggregate(PTransform):
+  def __init__(self, grouping, aggregations):
+    self._grouping = grouping
+    self._aggregations = aggregations
+
+  def aggregate_field(
+      self,
+      field,  # type: typing.Union[str, callable]
+      combine_fn,  # type: typing.Union[callable, CombineFn]
+      dest,  # type: str
+      ):
+    field = _expr_to_callable(field, 0)
+    return _GroupAndAggregate(
+        self._grouping, list(self._aggregations) + [(field, combine_fn, dest)])
+
+  def expand(self, pcoll):
+    from apache_beam.transforms.combiners import TupleCombineFn
+
+    # TODO(Py3): Use {**a, **b} syntax once Python 2 is gone.

Review comment:
       ```suggestion
       # TODO(BEAM-7372): Use {**a, **b} syntax once Python 2 is gone.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #11963: Add relational GroupBy transform to Python.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #11963:
URL: https://github.com/apache/beam/pull/11963#issuecomment-642251071


   CC: @TheNeuralBit 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb merged pull request #11963: Add relational GroupBy transform to Python.

Posted by GitBox <gi...@apache.org>.
robertwb merged pull request #11963:
URL: https://github.com/apache/beam/pull/11963


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #11963: Add relational GroupBy transform to Python.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #11963:
URL: https://github.com/apache/beam/pull/11963#issuecomment-663293495


   :sdks:python:test-suites:dataflow:py2:preCommitIT_streaming is unrelated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org