You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/07/20 01:30:05 UTC
spark git commit: [SPARK-8638] [SQL] Window Function Performance
Improvements - Cleanup
Repository: spark
Updated Branches:
refs/heads/master a803ac3e0 -> 7a8124534
[SPARK-8638] [SQL] Window Function Performance Improvements - Cleanup
This PR contains a few clean-ups that are a part of SPARK-8638: a few style issues got fixed, and a few tests were moved.
Git commit message is wrong BTW :(...
Author: Herman van Hovell <hv...@questtec.nl>
Closes #7513 from hvanhovell/SPARK-8638-cleanup and squashes the following commits:
4e69d08 [Herman van Hovell] Fixed Perfomance Regression for Shrinking Window Frames (+Rebase)
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a812453
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a812453
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a812453
Branch: refs/heads/master
Commit: 7a81245345f2d6124423161786bb0d9f1c278ab8
Parents: a803ac3
Author: Herman van Hovell <hv...@questtec.nl>
Authored: Sun Jul 19 16:29:50 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sun Jul 19 16:29:50 2015 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/execution/Window.scala | 14 ++--
.../sql/hive/HiveDataFrameWindowSuite.scala | 43 +++++++++++
.../spark/sql/hive/execution/WindowSuite.scala | 79 --------------------
3 files changed, 51 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7a812453/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index a054f52..de04132 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -118,22 +118,24 @@ case class Window(
val exprs = windowSpec.orderSpec.map(_.child)
val projection = newMutableProjection(exprs, child.output)
(windowSpec.orderSpec, projection(), projection())
- }
- else if (windowSpec.orderSpec.size == 1) {
+ } else if (windowSpec.orderSpec.size == 1) {
// Use only the first order expression when the offset is non-null.
val sortExpr = windowSpec.orderSpec.head
val expr = sortExpr.child
// Create the projection which returns the current 'value'.
val current = newMutableProjection(expr :: Nil, child.output)()
// Flip the sign of the offset when processing the order is descending
- val boundOffset = if (sortExpr.direction == Descending) -offset
- else offset
+ val boundOffset =
+ if (sortExpr.direction == Descending) {
+ -offset
+ } else {
+ offset
+ }
// Create the projection which returns the current 'value' modified by adding the offset.
val boundExpr = Add(expr, Cast(Literal.create(boundOffset, IntegerType), expr.dataType))
val bound = newMutableProjection(boundExpr :: Nil, child.output)()
(sortExpr :: Nil, current, bound)
- }
- else {
+ } else {
sys.error("Non-Zero range offsets are not supported for windows " +
"with multiple order expressions.")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7a812453/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
index 15b5f41..c177cbd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
@@ -212,4 +212,47 @@ class HiveDataFrameWindowSuite extends QueryTest {
| (PARTITION BY value ORDER BY key RANGE BETWEEN 1 preceding and current row)
| FROM window_table""".stripMargin).collect())
}
+
+ test("reverse sliding range frame") {
+ val df = Seq(
+ (1, "Thin", "Cell Phone", 6000),
+ (2, "Normal", "Tablet", 1500),
+ (3, "Mini", "Tablet", 5500),
+ (4, "Ultra thin", "Cell Phone", 5500),
+ (5, "Very thin", "Cell Phone", 6000),
+ (6, "Big", "Tablet", 2500),
+ (7, "Bendable", "Cell Phone", 3000),
+ (8, "Foldable", "Cell Phone", 3000),
+ (9, "Pro", "Tablet", 4500),
+ (10, "Pro2", "Tablet", 6500)).
+ toDF("id", "product", "category", "revenue")
+ val window = Window.
+ partitionBy($"category").
+ orderBy($"revenue".desc).
+ rangeBetween(-2000L, 1000L)
+ checkAnswer(
+ df.select(
+ $"id",
+ avg($"revenue").over(window).cast("int")),
+ Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
+ Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) ::
+ Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) ::
+ Row(10, 6000) :: Nil)
+ }
+
+ // This is here to illustrate the fact that reverse order also reverses offsets.
+ test("reverse unbounded range frame") {
+ val df = Seq(1, 2, 4, 3, 2, 1).
+ map(Tuple1.apply).
+ toDF("value")
+ val window = Window.orderBy($"value".desc)
+ checkAnswer(
+ df.select(
+ $"value",
+ sum($"value").over(window.rangeBetween(Long.MinValue, 1)),
+ sum($"value").over(window.rangeBetween(1, Long.MaxValue))),
+ Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) ::
+ Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil)
+
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7a812453/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala
deleted file mode 100644
index a089d0d..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.sql.{Row, QueryTest}
-import org.apache.spark.sql.expressions.Window
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.test.TestHive.implicits._
-
-/**
- * Window expressions are tested extensively by the following test suites:
- * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]]
- * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]]
- * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]]
- * However these suites do not cover all possible (i.e. more exotic) settings. This suite fill
- * this gap.
- *
- * TODO Move this class to the sql/core project when we move to Native Spark UDAFs.
- */
-class WindowSuite extends QueryTest {
-
- test("reverse sliding range frame") {
- val df = Seq(
- (1, "Thin", "Cell Phone", 6000),
- (2, "Normal", "Tablet", 1500),
- (3, "Mini", "Tablet", 5500),
- (4, "Ultra thin", "Cell Phone", 5500),
- (5, "Very thin", "Cell Phone", 6000),
- (6, "Big", "Tablet", 2500),
- (7, "Bendable", "Cell Phone", 3000),
- (8, "Foldable", "Cell Phone", 3000),
- (9, "Pro", "Tablet", 4500),
- (10, "Pro2", "Tablet", 6500)).
- toDF("id", "product", "category", "revenue")
- val window = Window.
- partitionBy($"category").
- orderBy($"revenue".desc).
- rangeBetween(-2000L, 1000L)
- checkAnswer(
- df.select(
- $"id",
- avg($"revenue").over(window).cast("int")),
- Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
- Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) ::
- Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) ::
- Row(10, 6000) :: Nil)
- }
-
- // This is here to illustrate the fact that reverse order also reverses offsets.
- test("reverse unbounded range frame") {
- val df = Seq(1, 2, 4, 3, 2, 1).
- map(Tuple1.apply).
- toDF("value")
- val window = Window.orderBy($"value".desc)
- checkAnswer(
- df.select(
- $"value",
- sum($"value").over(window.rangeBetween(Long.MinValue, 1)),
- sum($"value").over(window.rangeBetween(1, Long.MaxValue))),
- Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) ::
- Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil)
-
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org