You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/10 20:30:46 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #11963: Add relational GroupBy transform to Python.

TheNeuralBit commented on a change in pull request #11963:
URL: https://github.com/apache/beam/pull/11963#discussion_r453052127



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -2247,6 +2248,154 @@ def runner_api_requires_keyed_input(self):
     return True
 
 
+def _expr_to_callable(expr, pos):
+  if isinstance(expr, str):
+    return lambda x: getattr(x, expr)
+  elif callable(expr):
+    return expr
+  else:
+    raise TypeError(
+        'Field expression %r at %s must be a callable or a string.' %
+        (expr, pos))
+
+
+class GroupBy(PTransform):
+  """Groups a PCollection by one or more expressions, used to derive the key.
+
+  `GroupBy(expr)` is roughly equivalent to
+
+      beam.Map(lambda v: (expr(v), v)) | GroupByKey()

Review comment:
       ```suggestion
         beam.Map(lambda v: (expr(v), v)) | beam.GroupByKey()
   ```

##########
File path: sdks/python/apache_beam/typehints/native_type_compatibility.py
##########
@@ -85,7 +85,7 @@ def _safe_issubclass(derived, parent):
   """
   try:
     return issubclass(derived, parent)
-  except TypeError:
+  except (TypeError, AttributeError):

Review comment:
       Will issubclass actually raise an AttributeError? Curious what causes this

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -87,7 +87,10 @@ def from_runner_api_parameter(schema, components, unused_context):
   @staticmethod
   def from_type_hint(type_hint, registry):
     if isinstance(type_hint, row_type.RowTypeConstraint):
-      schema = named_fields_to_schema(type_hint._fields)
+      try:
+        schema = named_fields_to_schema(type_hint._fields)
+      except ValueError:
+        return typecoders.registry.get_coder(object)

Review comment:
       Is this a fallback for when a Row uses types that we don't support in Python schemas?
   
   I worry about this since it makes it tricky for a user to tell when a PCollection can be used in an ExternalTransform that uses rows. Are there some specific types that we need to add coverage for?

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -2247,6 +2248,154 @@ def runner_api_requires_keyed_input(self):
     return True
 
 
+def _expr_to_callable(expr, pos):
+  if isinstance(expr, str):
+    return lambda x: getattr(x, expr)
+  elif callable(expr):
+    return expr
+  else:
+    raise TypeError(
+        'Field expression %r at %s must be a callable or a string.' %
+        (expr, pos))
+
+
+class GroupBy(PTransform):
+  """Groups a PCollection by one or more expressions, used to derive the key.
+
+  `GroupBy(expr)` is roughly equivalent to
+
+      beam.Map(lambda v: (expr(v), v)) | GroupByKey()
+
+  but provides several conviniences, e.g.
+
+      * Several arguments may be provided, as positional or keyword arguments,
+        resulting in a tuple-like key. For example `GroupBy(a=expr1, b=expr2)`
+        groups by a key with attributes `a` and `b` computed by applying
+        `expr1` and `expr2` to each element.
+
+      * Strings can be used as a shorthand for accessing an attribute, e.g.
+        `GroupBy('some_field')` is equivalent to
+        `GroupBy(lambda v: getattr(v, 'some_field'))`.
+
+  The GroupBy operation can be made into an aggregating operation by invoking
+  its `aggregate_field` method.
+  """
+  def __init__(
+      self,
+      *fields,  # type: typing.Union[str, callable]
+      **kwargs  # type: typing.Union[str, callable]
+    ):
+    if len(fields) == 1 and not kwargs:
+      self._force_tuple_keys = False
+      name = fields[0] if isinstance(fields[0], str) else 'key'
+      key_fields = [(name, _expr_to_callable(fields[0], 0))]
+    else:
+      self._force_tuple_keys = True
+      key_fields = []
+      for ix, field in enumerate(fields):
+        name = field if isinstance(field, str) else 'key%d' % ix
+        key_fields.append((name, _expr_to_callable(field, ix)))
+      for name, expr in kwargs.items():
+        key_fields.append((name, _expr_to_callable(expr, name)))
+    self._key_fields = key_fields
+    # TODO(robertwb): Pickling of dynamic named tuples.
+    # self._key_type = typing.NamedTuple(
+    #     'Key', [name for name, _ in self._key_fields])

Review comment:
       I had this problem with the NamedTuple used in an external transform's output PCollection, I solved it there by making a custom `__reduce__` function that rebuilds from the proto-encoded schema:
   https://github.com/apache/beam/blob/80de47655c90594eead2f51eece206e8c4fe0227/sdks/python/apache_beam/typehints/schemas.py#L210-L225
   
   (A TODO is fine as well, just wanted to point that out in case it's helpful)

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -2247,6 +2248,154 @@ def runner_api_requires_keyed_input(self):
     return True
 
 
+def _expr_to_callable(expr, pos):
+  if isinstance(expr, str):
+    return lambda x: getattr(x, expr)
+  elif callable(expr):
+    return expr
+  else:
+    raise TypeError(
+        'Field expression %r at %s must be a callable or a string.' %
+        (expr, pos))
+
+
+class GroupBy(PTransform):
+  """Groups a PCollection by one or more expressions, used to derive the key.
+
+  `GroupBy(expr)` is roughly equivalent to
+
+      beam.Map(lambda v: (expr(v), v)) | GroupByKey()
+
+  but provides several conviniences, e.g.
+
+      * Several arguments may be provided, as positional or keyword arguments,
+        resulting in a tuple-like key. For example `GroupBy(a=expr1, b=expr2)`
+        groups by a key with attributes `a` and `b` computed by applying
+        `expr1` and `expr2` to each element.
+
+      * Strings can be used as a shorthand for accessing an attribute, e.g.
+        `GroupBy('some_field')` is equivalent to
+        `GroupBy(lambda v: getattr(v, 'some_field'))`.
+
+  The GroupBy operation can be made into an aggregating operation by invoking
+  its `aggregate_field` method.
+  """
+  def __init__(
+      self,
+      *fields,  # type: typing.Union[str, callable]
+      **kwargs  # type: typing.Union[str, callable]
+    ):
+    if len(fields) == 1 and not kwargs:
+      self._force_tuple_keys = False
+      name = fields[0] if isinstance(fields[0], str) else 'key'
+      key_fields = [(name, _expr_to_callable(fields[0], 0))]
+    else:
+      self._force_tuple_keys = True
+      key_fields = []
+      for ix, field in enumerate(fields):
+        name = field if isinstance(field, str) else 'key%d' % ix
+        key_fields.append((name, _expr_to_callable(field, ix)))
+      for name, expr in kwargs.items():
+        key_fields.append((name, _expr_to_callable(expr, name)))
+    self._key_fields = key_fields
+    # TODO(robertwb): Pickling of dynamic named tuples.
+    # self._key_type = typing.NamedTuple(
+    #     'Key', [name for name, _ in self._key_fields])
+    self._key_type = lambda *values: pvalue.Row(
+        **{name: value
+           for (name, _), value in zip(self._key_fields, values)})
+
+  def aggregate_field(
+      self,
+      field,  # type: typing.Union[str, callable]
+      combine_fn,  # type: typing.Union[callable, CombineFn]
+      dest,  # type: str
+    ):
+    """Returns a grouping operation that also aggregates grouped values.
+
+    Args:
+      field: indicates the field to be aggregated
+      combine_fn: indicates the aggregation function to be used
+      dest: indicates the name that will be used for the aggregate in the output
+
+    May be called repeatedly to aggregate multiple fields, e.g.
+
+        GroupBy('key')
+            .aggregate_field('some_attr', sum, 'sum_attr')
+            .aggregate_field(lambda v: ..., MeanCombineFn, 'mean')
+    """
+    return _GroupAndAggregate(self, ()).aggregate_field(field, combine_fn, dest)
+
+  def force_tuple_keys(self, value=True):
+    """Forces the keys to always be tuple-like, even if there is only a single
+    expression.
+    """
+    res = copy.copy(self)
+    res._force_tuple_keys = value
+    return res
+
+  def _key_func(self):
+    if not self._force_tuple_keys and len(self._key_fields) == 1:
+      return self._key_fields[0][1]
+    else:
+      key_type = self._key_type
+      key_exprs = [expr for _, expr in self._key_fields]
+      return lambda element: key_type(*(expr(element) for expr in key_exprs))
+
+  def default_label(self):
+    return 'GroupBy(%s)' % ', '.join(name for name, _ in self._key_fields)
+
+  def expand(self, pcoll):
+    return pcoll | Map(lambda x: (self._key_func()(x), x)) | GroupByKey()
+
+
+class _GroupAndAggregate(PTransform):
+  def __init__(self, grouping, aggregations):
+    self._grouping = grouping
+    self._aggregations = aggregations
+
+  def aggregate_field(
+      self,
+      field,  # type: typing.Union[str, callable]
+      combine_fn,  # type: typing.Union[callable, CombineFn]
+      dest,  # type: str
+      ):
+    field = _expr_to_callable(field, 0)
+    return _GroupAndAggregate(
+        self._grouping, list(self._aggregations) + [(field, combine_fn, dest)])
+
+  def expand(self, pcoll):
+    from apache_beam.transforms.combiners import TupleCombineFn
+
+    # TODO(Py3): Use {**a, **b} syntax once Python 2 is gone.

Review comment:
       ```suggestion
       # TODO(BEAM-7372): Use {**a, **b} syntax once Python 2 is gone.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org