You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/21 20:24:53 UTC

[GitHub] [spark] grundprinzip commented on a diff in pull request #39149: [SPARK-41292][SPARK-41640][SPARK-41641][CONNECT][PYTHON] Implement `Window` functions

grundprinzip commented on code in PR #39149:
URL: https://github.com/apache/spark/pull/39149#discussion_r1054803637


##########
python/pyspark/sql/connect/column.py:
##########
@@ -568,6 +569,98 @@ def __repr__(self) -> str:
         return f"(LambdaFunction({str(self._function)}, {', '.join(self._arguments)})"
 
 
+class WindowExpression(Expression):
+    def __init__(
+        self,
+        windowFunction: Expression,
+        windowSpec: "WindowSpecType",
+    ) -> None:
+        super().__init__()
+
+        from pyspark.sql.connect.window import WindowSpec
+
+        assert windowFunction is not None and isinstance(windowFunction, Expression)
+
+        assert windowSpec is not None and isinstance(windowSpec, WindowSpec)
+
+        self._windowFunction = windowFunction
+
+        self._windowSpec = windowSpec
+
+    def to_plan(self, session: "SparkConnectClient") -> proto.Expression:
+        expr = proto.Expression()
+

Review Comment:
   rm line



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -681,6 +693,70 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformWindowExpression(window: proto.Expression.Window) = {

Review Comment:
   While the code is nicely structured, it is quite wide, I'm wondering if we can restructure it a bit or add more doc.



##########
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto:
##########
@@ -39,6 +39,63 @@ message Expression {
     UnresolvedRegex unresolved_regex = 8;
     SortOrder sort_order = 9;
     LambdaFunction lambda_function = 10;
+    Window window = 11;
+  }
+
+
+  // Expression for the OVER clause or WINDOW clause.
+  message Window {
+
+    // (Required) The window function.
+    Expression window_function = 1;
+
+    // (Optional) The way that input rows are partitioned.
+    repeated Expression partition_spec = 2;
+
+    // (Optional) Ordering of rows in a partition.
+    repeated SortOrder order_spec = 3;
+
+    // (Optional) Window frame in a partition.
+    //
+    // If not set, it will be treated as 'UnspecifiedFrame'.
+    WindowFrame frame_spec = 4;
+
+    // The window frame
+    message WindowFrame {
+
+      // (Required) The type of the frame.
+      FrameType frame_type = 1;
+
+      // (Required) The lower bound of the frame.
+      FrameBoundary lower = 2;
+
+      // (Required) The upper bound of the frame.
+      FrameBoundary upper = 3;
+
+      enum FrameType {
+        // RowFrame treats rows in a partition individually.
+        ROW_FRAME = 0;
+
+        // RangeFrame treats rows in a partition as groups of peers.
+        // All rows having the same 'ORDER BY' ordering are considered as peers.
+        RANGE_FRAME = 1;
+      }

Review Comment:
   This is to follow the proto style guides.



##########
python/pyspark/sql/connect/column.py:
##########
@@ -568,6 +569,98 @@ def __repr__(self) -> str:
         return f"(LambdaFunction({str(self._function)}, {', '.join(self._arguments)})"
 
 
+class WindowExpression(Expression):
+    def __init__(
+        self,
+        windowFunction: Expression,
+        windowSpec: "WindowSpecType",
+    ) -> None:
+        super().__init__()
+
+        from pyspark.sql.connect.window import WindowSpec
+
+        assert windowFunction is not None and isinstance(windowFunction, Expression)
+
+        assert windowSpec is not None and isinstance(windowSpec, WindowSpec)
+
+        self._windowFunction = windowFunction
+
+        self._windowSpec = windowSpec
+
+    def to_plan(self, session: "SparkConnectClient") -> proto.Expression:
+        expr = proto.Expression()
+
+        expr.window.window_function.CopyFrom(self._windowFunction.to_plan(session))
+
+        if len(self._windowSpec._partitionSpec) > 0:
+            expr.window.partition_spec.extend(
+                [p.to_plan(session) for p in self._windowSpec._partitionSpec]
+            )
+        else:
+            warnings.warn(
+                "WARN WindowExpression: No Partition Defined for Window operation! "
+                "Moving all data to a single partition, this can cause serious "
+                "performance degradation."
+            )
+
+        if len(self._windowSpec._orderSpec) > 0:
+            expr.window.order_spec.extend(
+                [s.to_plan(session).sort_order for s in self._windowSpec._orderSpec]
+            )
+
+        if self._windowSpec._frame is not None:
+            if self._windowSpec._frame._isRowFrame:
+                expr.window.frame_spec.frame_type = (
+                    proto.Expression.Window.WindowFrame.FrameType.ROW_FRAME
+                )
+
+                start = self._windowSpec._frame._start
+                if start == 0:
+                    expr.window.frame_spec.lower.current_row = True
+                elif start == JVM_LONG_MIN:
+                    expr.window.frame_spec.lower.unbounded = True
+                elif JVM_INT_MIN <= start <= JVM_INT_MAX:
+                    expr.window.frame_spec.lower.value.literal.integer = start
+                else:
+                    raise ValueError(f"start is out of bound: {start}")
+
+                end = self._windowSpec._frame._end
+                if end == 0:
+                    expr.window.frame_spec.upper.current_row = True
+                elif end == JVM_LONG_MAX:
+                    expr.window.frame_spec.upper.unbounded = True
+                elif JVM_INT_MIN <= end <= JVM_INT_MAX:
+                    expr.window.frame_spec.upper.value.literal.integer = end
+                else:
+                    raise ValueError(f"end is out of bound: {end}")
+
+            else:
+                expr.window.frame_spec.frame_type = (
+                    proto.Expression.Window.WindowFrame.FrameType.RANGE_FRAME
+                )
+
+                start = self._windowSpec._frame._start
+                if start == 0:
+                    expr.window.frame_spec.lower.current_row = True
+                elif start == JVM_LONG_MIN:
+                    expr.window.frame_spec.lower.unbounded = True
+                else:
+                    expr.window.frame_spec.lower.value.literal.long = start
+
+                end = self._windowSpec._frame._end
+                if end == 0:
+                    expr.window.frame_spec.upper.current_row = True
+                elif end == JVM_LONG_MAX:

Review Comment:
   ```suggestion
                   elif end >= JVM_LONG_MAX:
   ```
   
   ?



##########
python/pyspark/sql/connect/column.py:
##########
@@ -568,6 +569,98 @@ def __repr__(self) -> str:
         return f"(LambdaFunction({str(self._function)}, {', '.join(self._arguments)})"
 
 
+class WindowExpression(Expression):
+    def __init__(
+        self,
+        windowFunction: Expression,
+        windowSpec: "WindowSpecType",
+    ) -> None:
+        super().__init__()
+
+        from pyspark.sql.connect.window import WindowSpec
+
+        assert windowFunction is not None and isinstance(windowFunction, Expression)
+
+        assert windowSpec is not None and isinstance(windowSpec, WindowSpec)
+
+        self._windowFunction = windowFunction
+
+        self._windowSpec = windowSpec
+
+    def to_plan(self, session: "SparkConnectClient") -> proto.Expression:
+        expr = proto.Expression()
+
+        expr.window.window_function.CopyFrom(self._windowFunction.to_plan(session))
+
+        if len(self._windowSpec._partitionSpec) > 0:
+            expr.window.partition_spec.extend(
+                [p.to_plan(session) for p in self._windowSpec._partitionSpec]
+            )
+        else:
+            warnings.warn(
+                "WARN WindowExpression: No Partition Defined for Window operation! "
+                "Moving all data to a single partition, this can cause serious "
+                "performance degradation."
+            )
+
+        if len(self._windowSpec._orderSpec) > 0:
+            expr.window.order_spec.extend(
+                [s.to_plan(session).sort_order for s in self._windowSpec._orderSpec]
+            )
+
+        if self._windowSpec._frame is not None:
+            if self._windowSpec._frame._isRowFrame:
+                expr.window.frame_spec.frame_type = (
+                    proto.Expression.Window.WindowFrame.FrameType.ROW_FRAME
+                )
+
+                start = self._windowSpec._frame._start
+                if start == 0:
+                    expr.window.frame_spec.lower.current_row = True
+                elif start == JVM_LONG_MIN:
+                    expr.window.frame_spec.lower.unbounded = True
+                elif JVM_INT_MIN <= start <= JVM_INT_MAX:
+                    expr.window.frame_spec.lower.value.literal.integer = start
+                else:
+                    raise ValueError(f"start is out of bound: {start}")
+
+                end = self._windowSpec._frame._end
+                if end == 0:
+                    expr.window.frame_spec.upper.current_row = True
+                elif end == JVM_LONG_MAX:
+                    expr.window.frame_spec.upper.unbounded = True
+                elif JVM_INT_MIN <= end <= JVM_INT_MAX:
+                    expr.window.frame_spec.upper.value.literal.integer = end
+                else:
+                    raise ValueError(f"end is out of bound: {end}")
+
+            else:
+                expr.window.frame_spec.frame_type = (
+                    proto.Expression.Window.WindowFrame.FrameType.RANGE_FRAME
+                )
+
+                start = self._windowSpec._frame._start
+                if start == 0:
+                    expr.window.frame_spec.lower.current_row = True
+                elif start == JVM_LONG_MIN:

Review Comment:
   ```suggestion
                   elif start <= JVM_LONG_MIN:
   ```
   
   ?



##########
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto:
##########
@@ -39,6 +39,63 @@ message Expression {
     UnresolvedRegex unresolved_regex = 8;
     SortOrder sort_order = 9;
     LambdaFunction lambda_function = 10;
+    Window window = 11;
+  }
+
+
+  // Expression for the OVER clause or WINDOW clause.
+  message Window {
+
+    // (Required) The window function.
+    Expression window_function = 1;
+
+    // (Optional) The way that input rows are partitioned.
+    repeated Expression partition_spec = 2;
+
+    // (Optional) Ordering of rows in a partition.
+    repeated SortOrder order_spec = 3;
+
+    // (Optional) Window frame in a partition.
+    //
+    // If not set, it will be treated as 'UnspecifiedFrame'.
+    WindowFrame frame_spec = 4;
+
+    // The window frame
+    message WindowFrame {
+
+      // (Required) The type of the frame.
+      FrameType frame_type = 1;
+
+      // (Required) The lower bound of the frame.
+      FrameBoundary lower = 2;
+
+      // (Required) The upper bound of the frame.
+      FrameBoundary upper = 3;
+
+      enum FrameType {
+        // RowFrame treats rows in a partition individually.
+        ROW_FRAME = 0;
+
+        // RangeFrame treats rows in a partition as groups of peers.
+        // All rows having the same 'ORDER BY' ordering are considered as peers.
+        RANGE_FRAME = 1;
+      }

Review Comment:
   ```suggestion
         enum FrameType {
           FRAME_TYPE_UNDEFINED = 0;
           // RowFrame treats rows in a partition individually.
           FRAME_TYPE_ROW = 0;
   
           // RangeFrame treats rows in a partition as groups of peers.
           // All rows having the same 'ORDER BY' ordering are considered as peers.
           FRAME_TYPE_RANGE = 1;
         }
   ```



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -623,6 +625,16 @@ class SparkConnectPlanner(session: SparkSession) {
         val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
         Some(In(children.head, children.tail))
 
+      case "nth_value" if fun.getArgumentsCount == 3 =>

Review Comment:
   so sad, not worth fixing?



##########
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto:
##########
@@ -39,6 +39,63 @@ message Expression {
     UnresolvedRegex unresolved_regex = 8;
     SortOrder sort_order = 9;
     LambdaFunction lambda_function = 10;
+    Window window = 11;
+  }
+
+
+  // Expression for the OVER clause or WINDOW clause.
+  message Window {
+
+    // (Required) The window function.
+    Expression window_function = 1;
+
+    // (Optional) The way that input rows are partitioned.
+    repeated Expression partition_spec = 2;
+
+    // (Optional) Ordering of rows in a partition.
+    repeated SortOrder order_spec = 3;
+
+    // (Optional) Window frame in a partition.
+    //
+    // If not set, it will be treated as 'UnspecifiedFrame'.
+    WindowFrame frame_spec = 4;

Review Comment:
   ```suggestion
       optional WindowFrame frame_spec = 4;
   ```



##########
python/pyspark/sql/tests/connect/test_connect_column.py:
##########
@@ -347,7 +347,6 @@ def test_unsupported_functions(self):
         # SPARK-41225: Disable unsupported functions.
         c = self.connect.range(1).id
         for f in (
-            "over",

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org