You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/07/20 01:30:05 UTC

spark git commit: [SPARK-8638] [SQL] Window Function Performance Improvements - Cleanup

Repository: spark
Updated Branches:
  refs/heads/master a803ac3e0 -> 7a8124534


[SPARK-8638] [SQL] Window Function Performance Improvements - Cleanup

This PR contains a few clean-ups that are a part of SPARK-8638: a few style issues got fixed, and a few tests were moved.

Git commit message is wrong BTW :(...

Author: Herman van Hovell <hv...@questtec.nl>

Closes #7513 from hvanhovell/SPARK-8638-cleanup and squashes the following commits:

4e69d08 [Herman van Hovell] Fixed Perfomance Regression for Shrinking Window Frames (+Rebase)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a812453
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a812453
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a812453

Branch: refs/heads/master
Commit: 7a81245345f2d6124423161786bb0d9f1c278ab8
Parents: a803ac3
Author: Herman van Hovell <hv...@questtec.nl>
Authored: Sun Jul 19 16:29:50 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sun Jul 19 16:29:50 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/Window.scala | 14 ++--
 .../sql/hive/HiveDataFrameWindowSuite.scala     | 43 +++++++++++
 .../spark/sql/hive/execution/WindowSuite.scala  | 79 --------------------
 3 files changed, 51 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7a812453/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index a054f52..de04132 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -118,22 +118,24 @@ case class Window(
           val exprs = windowSpec.orderSpec.map(_.child)
           val projection = newMutableProjection(exprs, child.output)
           (windowSpec.orderSpec, projection(), projection())
-        }
-        else if (windowSpec.orderSpec.size == 1) {
+        } else if (windowSpec.orderSpec.size == 1) {
           // Use only the first order expression when the offset is non-null.
           val sortExpr = windowSpec.orderSpec.head
           val expr = sortExpr.child
           // Create the projection which returns the current 'value'.
           val current = newMutableProjection(expr :: Nil, child.output)()
           // Flip the sign of the offset when processing the order is descending
-          val boundOffset = if (sortExpr.direction == Descending) -offset
-          else offset
+          val boundOffset =
+            if (sortExpr.direction == Descending) {
+              -offset
+            } else {
+              offset
+            }
           // Create the projection which returns the current 'value' modified by adding the offset.
           val boundExpr = Add(expr, Cast(Literal.create(boundOffset, IntegerType), expr.dataType))
           val bound = newMutableProjection(boundExpr :: Nil, child.output)()
           (sortExpr :: Nil, current, bound)
-        }
-        else {
+        } else {
           sys.error("Non-Zero range offsets are not supported for windows " +
             "with multiple order expressions.")
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/7a812453/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
index 15b5f41..c177cbd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
@@ -212,4 +212,47 @@ class HiveDataFrameWindowSuite extends QueryTest {
           |   (PARTITION BY value ORDER BY key RANGE BETWEEN 1 preceding and current row)
           | FROM window_table""".stripMargin).collect())
   }
+
+  test("reverse sliding range frame") {
+    val df = Seq(
+      (1, "Thin", "Cell Phone", 6000),
+      (2, "Normal", "Tablet", 1500),
+      (3, "Mini", "Tablet", 5500),
+      (4, "Ultra thin", "Cell Phone", 5500),
+      (5, "Very thin", "Cell Phone", 6000),
+      (6, "Big", "Tablet", 2500),
+      (7, "Bendable", "Cell Phone", 3000),
+      (8, "Foldable", "Cell Phone", 3000),
+      (9, "Pro", "Tablet", 4500),
+      (10, "Pro2", "Tablet", 6500)).
+      toDF("id", "product", "category", "revenue")
+    val window = Window.
+      partitionBy($"category").
+      orderBy($"revenue".desc).
+      rangeBetween(-2000L, 1000L)
+    checkAnswer(
+      df.select(
+        $"id",
+        avg($"revenue").over(window).cast("int")),
+      Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
+        Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) ::
+        Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) ::
+        Row(10, 6000) :: Nil)
+  }
+
+  // This is here to illustrate the fact that reverse order also reverses offsets.
+  test("reverse unbounded range frame") {
+    val df = Seq(1, 2, 4, 3, 2, 1).
+      map(Tuple1.apply).
+      toDF("value")
+    val window = Window.orderBy($"value".desc)
+    checkAnswer(
+      df.select(
+        $"value",
+        sum($"value").over(window.rangeBetween(Long.MinValue, 1)),
+        sum($"value").over(window.rangeBetween(1, Long.MaxValue))),
+      Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) ::
+        Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil)
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7a812453/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala
deleted file mode 100644
index a089d0d..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.sql.{Row, QueryTest}
-import org.apache.spark.sql.expressions.Window
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.test.TestHive.implicits._
-
-/**
- * Window expressions are tested extensively by the following test suites:
- * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]]
- * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]]
- * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]]
- * However these suites do not cover all possible (i.e. more exotic) settings. This suite fill
- * this gap.
- *
- * TODO Move this class to the sql/core project when we move to Native Spark UDAFs.
- */
-class WindowSuite extends QueryTest {
-
-  test("reverse sliding range frame") {
-    val df = Seq(
-      (1, "Thin", "Cell Phone", 6000),
-      (2, "Normal", "Tablet", 1500),
-      (3, "Mini", "Tablet", 5500),
-      (4, "Ultra thin", "Cell Phone", 5500),
-      (5, "Very thin", "Cell Phone", 6000),
-      (6, "Big", "Tablet", 2500),
-      (7, "Bendable", "Cell Phone", 3000),
-      (8, "Foldable", "Cell Phone", 3000),
-      (9, "Pro", "Tablet", 4500),
-      (10, "Pro2", "Tablet", 6500)).
-      toDF("id", "product", "category", "revenue")
-    val window = Window.
-      partitionBy($"category").
-      orderBy($"revenue".desc).
-      rangeBetween(-2000L, 1000L)
-    checkAnswer(
-      df.select(
-        $"id",
-        avg($"revenue").over(window).cast("int")),
-      Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
-      Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) ::
-      Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) ::
-      Row(10, 6000) :: Nil)
-  }
-
-  // This is here to illustrate the fact that reverse order also reverses offsets.
-  test("reverse unbounded range frame") {
-    val df = Seq(1, 2, 4, 3, 2, 1).
-      map(Tuple1.apply).
-      toDF("value")
-    val window = Window.orderBy($"value".desc)
-    checkAnswer(
-      df.select(
-        $"value",
-         sum($"value").over(window.rangeBetween(Long.MinValue, 1)),
-         sum($"value").over(window.rangeBetween(1, Long.MaxValue))),
-      Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) ::
-        Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil)
-
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org