You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by icexelloss <gi...@git.apache.org> on 2018/09/17 22:09:23 UTC
[GitHub] spark pull request #22305: [WIP][SPARK-24561][SQL][Python] User-defined wind...
Github user icexelloss commented on a diff in the pull request:
https://github.com/apache/spark/pull/22305#discussion_r218243887
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType}
+
+private[sql] abstract class WindowExecBase(
+ windowExpression: Seq[NamedExpression],
+ partitionSpec: Seq[Expression],
+ orderSpec: Seq[SortOrder],
+ child: SparkPlan) extends UnaryExecNode {
+
+ /**
+ * Create the resulting projection.
+ *
+ * This method uses Code Generation. It can only be used on the executor side.
+ *
+ * @param expressions unbound ordered function expressions.
+ * @return the final resulting projection.
+ */
+ protected def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = {
+ val references = expressions.zipWithIndex.map { case (e, i) =>
+ // Results of window expressions will be on the right side of child's output
+ BoundReference(child.output.size + i, e.dataType, e.nullable)
+ }
+ val unboundToRefMap = expressions.zip(references).toMap
+ val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
+ UnsafeProjection.create(
+ child.output ++ patchedWindowExpression,
+ child.output)
+ }
+
+ /**
+ * Create a bound ordering object for a given frame type and offset. A bound ordering object is
+ * used to determine which input row lies within the frame boundaries of an output row.
+ *
+ * This method uses Code Generation. It can only be used on the executor side.
+ *
+ * @param frame to evaluate. This can either be a Row or Range frame.
+ * @param bound with respect to the row.
+ * @param timeZone the session local timezone for time related calculations.
+ * @return a bound ordering object.
+ */
+ protected def createBoundOrdering(
+ frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
+ (frame, bound) match {
+ case (RowFrame, CurrentRow) =>
+ RowBoundOrdering(0)
+
+ case (RowFrame, IntegerLiteral(offset)) =>
+ RowBoundOrdering(offset)
+
+ case (RangeFrame, CurrentRow) =>
+ val ordering = newOrdering(orderSpec, child.output)
+ RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection)
+
+ case (RangeFrame, offset: Expression) if orderSpec.size == 1 =>
+ // Use only the first order expression when the offset is non-null.
+ val sortExpr = orderSpec.head
+ val expr = sortExpr.child
+
+ // Create the projection which returns the current 'value'.
+ val current = newMutableProjection(expr :: Nil, child.output)
+
+ // Flip the sign of the offset when processing the order is descending
+ val boundOffset = sortExpr.direction match {
+ case Descending => UnaryMinus(offset)
+ case Ascending => offset
+ }
+
+ // Create the projection which returns the current 'value' modified by adding the offset.
+ val boundExpr = (expr.dataType, boundOffset.dataType) match {
+ case (DateType, IntegerType) => DateAdd(expr, boundOffset)
+ case (TimestampType, CalendarIntervalType) =>
+ TimeAdd(expr, boundOffset, Some(timeZone))
+ case (a, b) if a== b => Add(expr, boundOffset)
+ }
+ val bound = newMutableProjection(boundExpr :: Nil, child.output)
+
+ // Construct the ordering. This is used to compare the result of current value projection
+ // to the result of bound value projection. This is done manually because we want to use
+ // Code Generation (if it is enabled).
+ val boundSortExprs = sortExpr.copy(BoundReference(0, expr.dataType, expr.nullable)) :: Nil
+ val ordering = newOrdering(boundSortExprs, Nil)
+ RangeBoundOrdering(ordering, current, bound)
+
+ case (RangeFrame, _) =>
+ sys.error("Non-Zero range offsets are not supported for windows " +
+ "with multiple order expressions.")
+ }
+ }
+
+ /**
+ * Collection containing an entry for each window frame to process. Each entry contains a frame's
+ * [[WindowExpression]]s and factory function for the WindowFrameFunction.
+ */
+ protected lazy val windowFrameExpressionFactoryPairs = {
+ type FrameKey = (String, FrameType, Expression, Expression)
+ type ExpressionBuffer = mutable.Buffer[Expression]
+ val framedFunctions = mutable.Map.empty[FrameKey, (ExpressionBuffer, ExpressionBuffer)]
+
+ // Add a function and its function to the map for a given frame.
+ def collect(tpe: String, fr: SpecifiedWindowFrame, e: Expression, fn: Expression): Unit = {
+ val key = (tpe, fr.frameType, fr.lower, fr.upper)
+ val (es, fns) = framedFunctions.getOrElseUpdate(
+ key, (ArrayBuffer.empty[Expression], ArrayBuffer.empty[Expression]))
+ es += e
+ fns += fn
+ }
+
+ // Collect all valid window functions and group them by their frame.
+ windowExpression.foreach { x =>
+ x.foreach {
+ case e @ WindowExpression(function, spec) =>
+ val frame = spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+ function match {
+ case AggregateExpression(f, _, _, _) => collect("AGGREGATE", frame, e, f)
+ case f: AggregateWindowFunction => collect("AGGREGATE", frame, e, f)
+ case f: OffsetWindowFunction => collect("OFFSET", frame, e, f)
+ case f: PythonUDF if PythonUDF.isGroupedAggPandasUDF(f) =>
--- End diff --
This bit is new
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org