You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/10 07:51:03 UTC

[GitHub] [beam] robertwb commented on a change in pull request #14135: [BEAM-11881] Fix partitioning ordering for DataFrames

robertwb commented on a change in pull request #14135:
URL: https://github.com/apache/beam/pull/14135#discussion_r591137449



##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -281,7 +321,7 @@ def requires_partition_by(self):
     return partitionings.Nothing()
 
   def preserves_partition_by(self):
-    return partitionings.Nothing()
+    return partitionings.Index()

Review comment:
       Similarly. 

##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -245,7 +285,7 @@ def requires_partition_by(self):
     return partitionings.Nothing()
 
   def preserves_partition_by(self):
-    return partitionings.Nothing()
+    return partitionings.Index()

Review comment:
       In the new hierarchy, Nothing should still be valid, right? Maybe that just sounds weird though. (E.g. one may require nothing and preserve everything.) 

##########
File path: sdks/python/apache_beam/dataframe/expressions_test.py
##########
@@ -53,6 +54,61 @@ def test_expression_proxy_error(self):
     with self.assertRaises(TypeError):
       expressions.ComputedExpression('add', lambda a, b: a + b, [a, b])
 
+  def test_output_partitioning_preserves_singleton(self):

Review comment:
       This reads like "test that output partitioning preserves singleton partitioning." Maybe rename to `test_preserves_singleton_ouptput_partitioning`. Similarly elsewhere. 

##########
File path: sdks/python/apache_beam/dataframe/expressions_test.py
##########
@@ -53,6 +54,61 @@ def test_expression_proxy_error(self):
     with self.assertRaises(TypeError):
       expressions.ComputedExpression('add', lambda a, b: a + b, [a, b])
 
+  def test_output_partitioning_preserves_singleton(self):
+    a = expressions.PlaceholderExpression(1)
+    b = expressions.PlaceholderExpression(2)
+
+    preserves_singleton = expressions.ComputedExpression(
+        'add',
+        lambda a,

Review comment:
       Put this in ()'s or suppress yapf. (Or maybe just pass None, or a dummy `lambda: None`)

##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -312,6 +352,11 @@ def __init__(
       requires_partition_by: The required (common) partitioning of the args.
       preserves_partition_by: The level of partitioning preserved.
     """
+    assert preserves_partition_by != partitionings.Nothing(), (
+        "Preserving Nothing() partitioning is not allowed. Any expression "
+        "can trivially preserve at least Singleton() partitioning. If you "
+        "intend to indicate this expression can preserve _any_ partioning, use "
+        "Index() instead.")

Review comment:
       What if we renamed Nothing to Arbitrary. Then it would make sense to preserve Arbitrary partitioning, and also require Arbitrary partitioning. 

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -331,8 +339,7 @@ def expr_to_stages(expr):
       required_partitioning = expr.requires_partition_by()
       for stage in common_stages([expr_to_stages(arg) for arg in expr.args()
                                   if arg not in inputs]):
-        if all(output_is_partitioned_by(arg, stage, required_partitioning)
-               for arg in expr.args() if not is_scalar(arg)):
+        if output_partitioning_in_stage(expr, stage) is not None:

Review comment:
       +1 to combining the logic. Maybe a helper `is_computable_in_stage(expr, stage)` whose implementation is `output_partitioning_in_stage(expr, stage) is not None` would make this easier to follow. 

##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -312,6 +352,11 @@ def __init__(
       requires_partition_by: The required (common) partitioning of the args.
       preserves_partition_by: The level of partitioning preserved.
     """
+    assert preserves_partition_by != partitionings.Nothing(), (
+        "Preserving Nothing() partitioning is not allowed. Any expression "
+        "can trivially preserve at least Singleton() partitioning. If you "
+        "intend to indicate this expression can preserve _any_ partioning, use "
+        "Index() instead.")

Review comment:
       This assumes we will never have anything between Index and Nothing. Maybe that's a fair assumption. 

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -282,30 +282,38 @@ def __repr__(self, indent=0):
                 self.outputs))
 
     # First define some helper functions.
-    def output_is_partitioned_by(expr, stage, partitioning):
-      if partitioning == partitionings.Nothing():
-        # Always satisfied.
-        return True
-      elif stage.partitioning == partitionings.Singleton():
-        # Within a stage, the singleton partitioning is trivially preserved.
-        return True
-      elif expr in stage.inputs:
+    def output_partitioning_in_stage(expr, stage):
+      """ Return the output partitioning of expr when computed in stage,

Review comment:
       Remove leading space.

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -282,30 +282,38 @@ def __repr__(self, indent=0):
                 self.outputs))
 
     # First define some helper functions.
-    def output_is_partitioned_by(expr, stage, partitioning):
-      if partitioning == partitionings.Nothing():
-        # Always satisfied.
-        return True
-      elif stage.partitioning == partitionings.Singleton():
-        # Within a stage, the singleton partitioning is trivially preserved.
-        return True
-      elif expr in stage.inputs:
+    def output_partitioning_in_stage(expr, stage):
+      """ Return the output partitioning of expr when computed in stage,
+      or returns None if the expression cannot be computed in this stage.
+      """
+      if expr in stage.inputs or expr in inputs:
         # Inputs are all partitioned by stage.partitioning.
-        return stage.partitioning.is_subpartitioning_of(partitioning)
-      elif expr.preserves_partition_by().is_subpartitioning_of(partitioning):
-        # Here expr preserves at least the requested partitioning; its outputs
-        # will also have this partitioning iff its inputs do.
-        if expr.requires_partition_by().is_subpartitioning_of(partitioning):
-          # If expr requires at least this partitioning, we will arrange such
-          # that its inputs satisfy this.
-          return True
-        else:
-          # Otherwise, recursively check all the inputs.
-          return all(
-              output_is_partitioned_by(arg, stage, partitioning)
-              for arg in expr.args())
-      else:
-        return False
+        return stage.partitioning
+
+      # Anything that's not an input must have arguments
+      assert len(expr.args())
+
+      arg_partitionings = set(
+          output_partitioning_in_stage(arg, stage) for arg in expr.args()
+          if not is_scalar(arg))
+
+      # TODO: what does it mean for all args to be 0? only scalar arguments? the

Review comment:
       len(arg_partitionings) to be 0? Yes, I think in that case this should be a scalar. Did we not run into this case? 

##########
File path: sdks/python/apache_beam/dataframe/partitionings_test.py
##########
@@ -36,8 +36,8 @@ class PartitioningsTest(unittest.TestCase):
   }).set_index(['shape', 'color', 'size'])
 
   def test_index_is_subpartition(self):
-    ordered_list = [Nothing(), Index([3]), Index([1, 3]), Index(), Singleton()]
-    for loose, strict in zip(ordered_list[:1], ordered_list[1:]):
+    ordered_list = [Singleton(), Index([3]), Index([1, 3]), Index(), Nothing()]
+    for loose, strict in zip(ordered_list[:-1], ordered_list[1:]):

Review comment:
       Ack.

##########
File path: sdks/python/apache_beam/dataframe/expressions_test.py
##########
@@ -53,6 +54,61 @@ def test_expression_proxy_error(self):
     with self.assertRaises(TypeError):
       expressions.ComputedExpression('add', lambda a, b: a + b, [a, b])
 
+  def test_output_partitioning_preserves_singleton(self):
+    a = expressions.PlaceholderExpression(1)
+    b = expressions.PlaceholderExpression(2)
+
+    preserves_singleton = expressions.ComputedExpression(
+        'add',
+        lambda a,
+        b: a + b, [a, b],
+        requires_partition_by=partitionings.Nothing(),
+        preserves_partition_by=partitionings.Singleton())
+
+    for partitioning in (partitionings.Singleton(), ):
+      self.assertEqual(
+          expressions.output_partitioning(preserves_singleton, partitioning),
+          partitioning,
+          f"Should preserve {partitioning}")
+
+    for partitioning in (partitionings.Index([0]),
+                         partitionings.Index(),
+                         partitionings.Nothing()):
+      self.assertEqual(
+          expressions.output_partitioning(preserves_singleton, partitioning),
+          partitionings.Nothing(),
+          f"Should NOT preserve {partitioning}")
+
+  def test_output_partitioning_preserves_index(self):
+    a = expressions.PlaceholderExpression(1)
+    b = expressions.PlaceholderExpression(2)
+
+    preserves_index = expressions.ComputedExpression(

Review comment:
       `preserves_partial_index`? Also, an example would be helpful (e.g. something that replaces level 2 of the index with a column).

##########
File path: sdks/python/apache_beam/dataframe/expressions_test.py
##########
@@ -53,6 +54,61 @@ def test_expression_proxy_error(self):
     with self.assertRaises(TypeError):
       expressions.ComputedExpression('add', lambda a, b: a + b, [a, b])
 
+  def test_output_partitioning_preserves_singleton(self):
+    a = expressions.PlaceholderExpression(1)
+    b = expressions.PlaceholderExpression(2)
+
+    preserves_singleton = expressions.ComputedExpression(

Review comment:
       Maybe call this `preserves_only_singleton`? And give an example (like setting the index from a column). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org