You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/11/21 00:41:38 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #13401: Add additional verification in PartitioningSession

TheNeuralBit commented on a change in pull request #13401:
URL: https://github.com/apache/beam/pull/13401#discussion_r528040010



##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -67,28 +72,57 @@ def is_scalar(expr):
         result = super(PartitioningSession, self).evaluate(expr)
       else:
         scaler_args = [arg for arg in expr.args() if is_scalar(arg)]
-        parts = collections.defaultdict(
-            lambda: Session({arg: self.evaluate(arg)
-                             for arg in scaler_args}))
-        for arg in expr.args():
-          if not is_scalar(arg):
-            input = self.evaluate(arg)
-            for key, part in expr.requires_partition_by().test_partition_fn(
-                input):
-              parts[key]._bindings[arg] = part
-        if not parts:
-          parts[None]  # Create at least one entry.
-
-        results = []
-        for session in parts.values():
-          if any(len(session.lookup(arg)) for arg in expr.args()
-                 if not is_scalar(arg)):
-            results.append(session.evaluate(expr))
-        if results:
-          result = pd.concat(results)
-        else:
-          # Choose any single session.
-          result = next(iter(parts.values())).evaluate(expr)
+
+        def evaluate_with(input_partitioning):
+          parts = collections.defaultdict(
+              lambda: Session({arg: self.evaluate(arg)
+                               for arg in scaler_args}))
+          for arg in expr.args():
+            if not is_scalar(arg):
+              input = self.evaluate(arg)
+              for key, part in input_partitioning.test_partition_fn(input):
+                parts[key]._bindings[arg] = part
+          if not parts:
+            parts[None]  # Create at least one entry.
+
+          results = []
+          for session in parts.values():
+            if any(len(session.lookup(arg)) for arg in expr.args()
+                   if not is_scalar(arg)):
+              results.append(session.evaluate(expr))
+
+          expected_output_partitioning = expr.preserves_partition_by(
+          ) if input_partitioning.is_subpartitioning_of(
+              expr.preserves_partition_by()) else input_partitioning
+
+          if not expected_output_partitioning.check(results):
+            raise AssertionError(
+                f"""Expression does not preserve partitioning!
+                Expression: {expr}
+                Requires: {expr.requires_partition_by()}
+                Preserves: {expr.preserves_partition_by()}
+                Input partitioning: {input_partitioning}
+                Expected output partitioning: {expected_output_partitioning}
+                """)
+
+          if results:
+            return pd.concat(results)
+          else:
+            # Choose any single session.
+            return next(iter(parts.values())).evaluate(expr)
+
+        input_partitioning = expr.requires_partition_by()
+
+        while input_partitioning is not None:
+          result = evaluate_with(input_partitioning)

Review comment:
       Note this only stores the most recently computed `result`. We may want to also verify that each input partitioning yields equivalent results (modulo ordering).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org