You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/23 07:50:35 UTC

spark git commit: [SPARK-14866][SQL] Break SQLQuerySuite out into smaller test suites

Repository: spark
Updated Branches:
  refs/heads/master bdde010ed -> 95faa731c


[SPARK-14866][SQL] Break SQLQuerySuite out into smaller test suites

## What changes were proposed in this pull request?
This patch breaks SQLQuerySuite out into smaller test suites. It was a little bit too large for debugging.

## How was this patch tested?
This is a test only change.

Author: Reynold Xin <rx...@databricks.com>

Closes #12630 from rxin/SPARK-14866.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95faa731
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95faa731
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95faa731

Branch: refs/heads/master
Commit: 95faa731c15ce2e36373071a405207165818df97
Parents: bdde010
Author: Reynold Xin <rx...@databricks.com>
Authored: Fri Apr 22 22:50:32 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Apr 22 22:50:32 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |   6 +-
 .../sql/hive/execution/SQLQuerySuite.scala      | 509 -------------------
 .../spark/sql/hive/execution/SQLViewSuite.scala | 199 ++++++++
 .../hive/execution/SQLWindowFunctionSuite.scala | 370 ++++++++++++++
 4 files changed, 572 insertions(+), 512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/95faa731/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 182e459..24558d5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -407,7 +407,7 @@ class Analyzer(
    * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
    */
   object ResolveRelations extends Rule[LogicalPlan] {
-    private def getTable(u: UnresolvedRelation): LogicalPlan = {
+    private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = {
       try {
         catalog.lookupRelation(u.tableIdentifier, u.alias)
       } catch {
@@ -418,10 +418,10 @@ class Analyzer(
 
     def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
       case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
-        i.copy(table = EliminateSubqueryAliases(getTable(u)))
+        i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
       case u: UnresolvedRelation =>
         try {
-          getTable(u)
+          lookupTableFromCatalog(u)
         } catch {
           case _: AnalysisException if u.tableIdentifier.database.isDefined =>
             // delay the exception into CheckAnalysis, then it could be resolved as data source.

http://git-wip-us.apache.org/repos/asf/spark/blob/95faa731/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 80d54f0..0ecc980 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -49,11 +49,6 @@ case class Order(
     state: String,
     month: Int)
 
-case class WindowData(
-    month: Int,
-    area: String,
-    product: Int)
-
 /**
  * A collection of hive query tests where we generate the answers ourselves instead of depending on
  * Hive to generate them (in contrast to HiveQuerySuite).  Often this is because the query is
@@ -812,197 +807,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         """.stripMargin), (2 to 6).map(i => Row(i)))
   }
 
-  test("window function: udaf with aggregate expression") {
-    val data = Seq(
-      WindowData(1, "a", 5),
-      WindowData(2, "a", 6),
-      WindowData(3, "b", 7),
-      WindowData(4, "b", 8),
-      WindowData(5, "c", 9),
-      WindowData(6, "c", 10)
-    )
-    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
-    checkAnswer(
-      sql(
-        """
-          |select area, sum(product), sum(sum(product)) over (partition by area)
-          |from windowData group by month, area
-        """.stripMargin),
-      Seq(
-        ("a", 5, 11),
-        ("a", 6, 11),
-        ("b", 7, 15),
-        ("b", 8, 15),
-        ("c", 9, 19),
-        ("c", 10, 19)
-      ).map(i => Row(i._1, i._2, i._3)))
-
-    checkAnswer(
-      sql(
-        """
-          |select area, sum(product) - 1, sum(sum(product)) over (partition by area)
-          |from windowData group by month, area
-        """.stripMargin),
-      Seq(
-        ("a", 4, 11),
-        ("a", 5, 11),
-        ("b", 6, 15),
-        ("b", 7, 15),
-        ("c", 8, 19),
-        ("c", 9, 19)
-      ).map(i => Row(i._1, i._2, i._3)))
-
-    checkAnswer(
-      sql(
-        """
-          |select area, sum(product), sum(product) / sum(sum(product)) over (partition by area)
-          |from windowData group by month, area
-        """.stripMargin),
-      Seq(
-        ("a", 5, 5d/11),
-        ("a", 6, 6d/11),
-        ("b", 7, 7d/15),
-        ("b", 8, 8d/15),
-        ("c", 10, 10d/19),
-        ("c", 9, 9d/19)
-      ).map(i => Row(i._1, i._2, i._3)))
-
-    checkAnswer(
-      sql(
-        """
-          |select area, sum(product), sum(product) / sum(sum(product) - 1) over (partition by area)
-          |from windowData group by month, area
-        """.stripMargin),
-      Seq(
-        ("a", 5, 5d/9),
-        ("a", 6, 6d/9),
-        ("b", 7, 7d/13),
-        ("b", 8, 8d/13),
-        ("c", 10, 10d/17),
-        ("c", 9, 9d/17)
-      ).map(i => Row(i._1, i._2, i._3)))
-  }
-
-  test("window function: refer column in inner select block") {
-    val data = Seq(
-      WindowData(1, "a", 5),
-      WindowData(2, "a", 6),
-      WindowData(3, "b", 7),
-      WindowData(4, "b", 8),
-      WindowData(5, "c", 9),
-      WindowData(6, "c", 10)
-    )
-    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
-    checkAnswer(
-      sql(
-        """
-          |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1
-          |from (select month, area, product, 1 as tmp1 from windowData) tmp
-        """.stripMargin),
-      Seq(
-        ("a", 2),
-        ("a", 3),
-        ("b", 2),
-        ("b", 3),
-        ("c", 2),
-        ("c", 3)
-      ).map(i => Row(i._1, i._2)))
-  }
-
-  test("window function: partition and order expressions") {
-    val data = Seq(
-      WindowData(1, "a", 5),
-      WindowData(2, "a", 6),
-      WindowData(3, "b", 7),
-      WindowData(4, "b", 8),
-      WindowData(5, "c", 9),
-      WindowData(6, "c", 10)
-    )
-    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
-    checkAnswer(
-      sql(
-        """
-          |select month, area, product, sum(product + 1) over (partition by 1 order by 2)
-          |from windowData
-        """.stripMargin),
-      Seq(
-        (1, "a", 5, 51),
-        (2, "a", 6, 51),
-        (3, "b", 7, 51),
-        (4, "b", 8, 51),
-        (5, "c", 9, 51),
-        (6, "c", 10, 51)
-      ).map(i => Row(i._1, i._2, i._3, i._4)))
-
-    checkAnswer(
-      sql(
-        """
-          |select month, area, product, sum(product)
-          |over (partition by month % 2 order by 10 - product)
-          |from windowData
-        """.stripMargin),
-      Seq(
-        (1, "a", 5, 21),
-        (2, "a", 6, 24),
-        (3, "b", 7, 16),
-        (4, "b", 8, 18),
-        (5, "c", 9, 9),
-        (6, "c", 10, 10)
-      ).map(i => Row(i._1, i._2, i._3, i._4)))
-  }
-
-  test("window function: distinct should not be silently ignored") {
-    val data = Seq(
-      WindowData(1, "a", 5),
-      WindowData(2, "a", 6),
-      WindowData(3, "b", 7),
-      WindowData(4, "b", 8),
-      WindowData(5, "c", 9),
-      WindowData(6, "c", 10)
-    )
-    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
-    val e = intercept[AnalysisException] {
-      sql(
-        """
-          |select month, area, product, sum(distinct product + 1) over (partition by 1 order by 2)
-          |from windowData
-        """.stripMargin)
-    }
-    assert(e.getMessage.contains("Distinct window functions are not supported"))
-  }
-
-  test("window function: expressions in arguments of a window functions") {
-    val data = Seq(
-      WindowData(1, "a", 5),
-      WindowData(2, "a", 6),
-      WindowData(3, "b", 7),
-      WindowData(4, "b", 8),
-      WindowData(5, "c", 9),
-      WindowData(6, "c", 10)
-    )
-    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
-    checkAnswer(
-      sql(
-        """
-          |select month, area, month % 2,
-          |lag(product, 1 + 1, product) over (partition by month % 2 order by area)
-          |from windowData
-        """.stripMargin),
-      Seq(
-        (1, "a", 1, 5),
-        (2, "a", 0, 6),
-        (3, "b", 1, 7),
-        (4, "b", 0, 8),
-        (5, "c", 1, 5),
-        (6, "c", 0, 6)
-      ).map(i => Row(i._1, i._2, i._3, i._4)))
-  }
-
   test("Sorting columns are not in Generate") {
     withTempTable("data") {
       sqlContext.range(1, 5)
@@ -1030,139 +834,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
-  test("window function: Sorting columns are not in Project") {
-    val data = Seq(
-      WindowData(1, "d", 10),
-      WindowData(2, "a", 6),
-      WindowData(3, "b", 7),
-      WindowData(4, "b", 8),
-      WindowData(5, "c", 9),
-      WindowData(6, "c", 11)
-    )
-    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
-    checkAnswer(
-      sql("select month, product, sum(product + 1) over() from windowData order by area"),
-      Seq(
-        (2, 6, 57),
-        (3, 7, 57),
-        (4, 8, 57),
-        (5, 9, 57),
-        (6, 11, 57),
-        (1, 10, 57)
-      ).map(i => Row(i._1, i._2, i._3)))
-
-    checkAnswer(
-      sql(
-        """
-          |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1
-          |from (select month, area, product as p, 1 as tmp1 from windowData) tmp order by p
-        """.stripMargin),
-      Seq(
-        ("a", 2),
-        ("b", 2),
-        ("b", 3),
-        ("c", 2),
-        ("d", 2),
-        ("c", 3)
-      ).map(i => Row(i._1, i._2)))
-
-    checkAnswer(
-      sql(
-      """
-        |select area, rank() over (partition by area order by month) as c1
-        |from windowData group by product, area, month order by product, area
-      """.stripMargin),
-      Seq(
-        ("a", 1),
-        ("b", 1),
-        ("b", 2),
-        ("c", 1),
-        ("d", 1),
-        ("c", 2)
-      ).map(i => Row(i._1, i._2)))
-
-    checkAnswer(
-      sql(
-        """
-          |select area, sum(product) / sum(sum(product)) over (partition by area) as c1
-          |from windowData group by area, month order by month, c1
-        """.stripMargin),
-      Seq(
-        ("d", 1.0),
-        ("a", 1.0),
-        ("b", 0.4666666666666667),
-        ("b", 0.5333333333333333),
-        ("c", 0.45),
-        ("c", 0.55)
-      ).map(i => Row(i._1, i._2)))
-  }
-
-  // todo: fix this test case by reimplementing the function ResolveAggregateFunctions
-  ignore("window function: Pushing aggregate Expressions in Sort to Aggregate") {
-    val data = Seq(
-      WindowData(1, "d", 10),
-      WindowData(2, "a", 6),
-      WindowData(3, "b", 7),
-      WindowData(4, "b", 8),
-      WindowData(5, "c", 9),
-      WindowData(6, "c", 11)
-    )
-    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
-    checkAnswer(
-      sql(
-        """
-          |select area, sum(product) over () as c from windowData
-          |where product > 3 group by area, product
-          |having avg(month) > 0 order by avg(month), product
-        """.stripMargin),
-      Seq(
-        ("a", 51),
-        ("b", 51),
-        ("b", 51),
-        ("c", 51),
-        ("c", 51),
-        ("d", 51)
-      ).map(i => Row(i._1, i._2)))
-  }
-
-  test("window function: multiple window expressions in a single expression") {
-    val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
-    nums.registerTempTable("nums")
-
-    val expected =
-      Row(1, 1, 1, 55, 1, 57) ::
-      Row(0, 2, 3, 55, 2, 60) ::
-      Row(1, 3, 6, 55, 4, 65) ::
-      Row(0, 4, 10, 55, 6, 71) ::
-      Row(1, 5, 15, 55, 9, 79) ::
-      Row(0, 6, 21, 55, 12, 88) ::
-      Row(1, 7, 28, 55, 16, 99) ::
-      Row(0, 8, 36, 55, 20, 111) ::
-      Row(1, 9, 45, 55, 25, 125) ::
-      Row(0, 10, 55, 55, 30, 140) :: Nil
-
-    val actual = sql(
-      """
-        |SELECT
-        |  y,
-        |  x,
-        |  sum(x) OVER w1 AS running_sum,
-        |  sum(x) OVER w2 AS total_sum,
-        |  sum(x) OVER w3 AS running_sum_per_y,
-        |  ((sum(x) OVER w1) + (sum(x) OVER w2) + (sum(x) OVER w3)) as combined2
-        |FROM nums
-        |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UnBOUNDED PRECEDiNG AND CuRRENT RoW),
-        |       w2 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOuNDED FoLLOWING),
-        |       w3 AS (PARTITION BY y ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
-      """.stripMargin)
-
-    checkAnswer(actual, expected)
-
-    dropTempTable("nums")
-  }
-
   test("test case key when") {
     (1 to 5).map(i => (i, i.toString)).toDF("k", "v").registerTempTable("t")
     checkAnswer(
@@ -1170,18 +841,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       Row(0, "1") :: Row(22, "2") :: Row(0, "3") :: Row(44, "4") :: Row(0, "5") :: Nil)
   }
 
-  test("SPARK-7595: Window will cause resolve failed with self join") {
-    sql("SELECT * FROM src") // Force loading of src table.
-
-    checkAnswer(sql(
-      """
-        |with
-        | v1 as (select key, count(value) over (partition by key) cnt_val from src),
-        | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key)
-        | select * from v2 order by key limit 1
-      """.stripMargin), Row(0, 3))
-  }
-
   test("SPARK-7269 Check analysis failed in case in-sensitive") {
     Seq(1, 2, 3).map { i =>
       (i.toString, i.toString)
@@ -1485,174 +1144,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     })
   }
 
-  test("correctly parse CREATE VIEW statement") {
-    withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
-      withTable("jt") {
-        val df = (1 until 10).map(i => i -> i).toDF("i", "j")
-        df.write.format("json").saveAsTable("jt")
-        sql(
-          """CREATE VIEW IF NOT EXISTS
-            |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
-            |TBLPROPERTIES ('a' = 'b')
-            |AS SELECT * FROM jt""".stripMargin)
-        checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
-        sql("DROP VIEW testView")
-      }
-    }
-  }
-
-  test("correctly handle CREATE VIEW IF NOT EXISTS") {
-    withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
-      withTable("jt", "jt2") {
-        sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
-        sql("CREATE VIEW testView AS SELECT id FROM jt")
-
-        val df = (1 until 10).map(i => i -> i).toDF("i", "j")
-        df.write.format("json").saveAsTable("jt2")
-        sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
-
-        // make sure our view doesn't change.
-        checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
-        sql("DROP VIEW testView")
-      }
-    }
-  }
-
-  Seq(true, false).foreach { enabled =>
-    val prefix = (if (enabled) "With" else "Without") + " canonical native view: "
-    test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
-      withSQLConf(
-        SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
-        withTable("jt", "jt2") {
-          sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
-          sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
-          checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
-
-          val df = (1 until 10).map(i => i -> i).toDF("i", "j")
-          df.write.format("json").saveAsTable("jt2")
-          sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
-          // make sure the view has been changed.
-          checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
-
-          sql("DROP VIEW testView")
-
-          val e = intercept[AnalysisException] {
-            sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
-          }
-          assert(e.message.contains("not allowed to define a view"))
-        }
-      }
-    }
-
-    test(s"$prefix correctly handle ALTER VIEW") {
-      withSQLConf(
-        SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
-        withTable("jt", "jt2") {
-          withView("testView") {
-            sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
-            sql("CREATE VIEW testView AS SELECT id FROM jt")
-
-            val df = (1 until 10).map(i => i -> i).toDF("i", "j")
-            df.write.format("json").saveAsTable("jt2")
-            sql("ALTER VIEW testView AS SELECT * FROM jt2")
-            // make sure the view has been changed.
-            checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
-          }
-        }
-      }
-    }
-
-    test(s"$prefix create hive view for json table") {
-      // json table is not hive-compatible, make sure the new flag fix it.
-      withSQLConf(
-        SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
-        withTable("jt") {
-          withView("testView") {
-            sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
-            sql("CREATE VIEW testView AS SELECT id FROM jt")
-            checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
-          }
-        }
-      }
-    }
-
-    test(s"$prefix create hive view for partitioned parquet table") {
-      // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
-      withSQLConf(
-        SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
-        withTable("parTable") {
-          withView("testView") {
-            val df = Seq(1 -> "a").toDF("i", "j")
-            df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
-            sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
-            checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
-          }
-        }
-      }
-    }
-  }
-
-  test("CTE within view") {
-    withSQLConf(
-      SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
-      withView("cte_view") {
-        sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w")
-        checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
-      }
-    }
-  }
-
-  test("Using view after switching current database") {
-    withSQLConf(
-      SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
-      withView("v") {
-        sql("CREATE VIEW v AS SELECT * FROM src")
-        withTempDatabase { db =>
-          activateDatabase(db) {
-            // Should look up table `src` in database `default`.
-            checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
-
-            // The new `src` table shouldn't be scanned.
-            sql("CREATE TABLE src(key INT, value STRING)")
-            checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
-          }
-        }
-      }
-    }
-  }
-
-  test("Using view after adding more columns") {
-    withSQLConf(
-      SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
-      withTable("add_col") {
-        sqlContext.range(10).write.saveAsTable("add_col")
-        withView("v") {
-          sql("CREATE VIEW v AS SELECT * FROM add_col")
-          sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
-          checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10).toDF())
-        }
-      }
-    }
-  }
-
-  test("create hive view for joined tables") {
-    // make sure the new flag can handle some complex cases like join and schema change.
-    withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
-      withTable("jt1", "jt2") {
-        sqlContext.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1")
-        sqlContext.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2")
-        sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2")
-        checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
-
-        val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol")
-        df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1")
-        checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
-
-        sql("DROP VIEW testView")
-      }
-    }
-  }
-
   test("SPARK-8976 Wrong Result for Rollup #1") {
     checkAnswer(sql(
       "SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"),

http://git-wip-us.apache.org/repos/asf/spark/blob/95faa731/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
new file mode 100644
index 0000000..cdd5cb3
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * A suite for testing view related functionality.
+ */
+class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+  import hiveContext.implicits._
+
+  test("correctly parse CREATE VIEW statement") {
+    withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
+      withTable("jt") {
+        val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+        df.write.format("json").saveAsTable("jt")
+        sql(
+          """CREATE VIEW IF NOT EXISTS
+            |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
+            |TBLPROPERTIES ('a' = 'b')
+            |AS SELECT * FROM jt""".stripMargin)
+        checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
+        sql("DROP VIEW testView")
+      }
+    }
+  }
+
+  test("correctly handle CREATE VIEW IF NOT EXISTS") {
+    withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
+      withTable("jt", "jt2") {
+        sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+        sql("CREATE VIEW testView AS SELECT id FROM jt")
+
+        val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+        df.write.format("json").saveAsTable("jt2")
+        sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
+
+        // make sure our view doesn't change.
+        checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+        sql("DROP VIEW testView")
+      }
+    }
+  }
+
+  Seq(true, false).foreach { enabled =>
+    val prefix = (if (enabled) "With" else "Without") + " canonical native view: "
+    test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
+      withSQLConf(
+        SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+        withTable("jt", "jt2") {
+          sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+          sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
+          checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+
+          val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+          df.write.format("json").saveAsTable("jt2")
+          sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
+          // make sure the view has been changed.
+          checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+
+          sql("DROP VIEW testView")
+
+          val e = intercept[AnalysisException] {
+            sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
+          }
+          assert(e.message.contains("not allowed to define a view"))
+        }
+      }
+    }
+
+    test(s"$prefix correctly handle ALTER VIEW") {
+      withSQLConf(
+        SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+        withTable("jt", "jt2") {
+          withView("testView") {
+            sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+            sql("CREATE VIEW testView AS SELECT id FROM jt")
+
+            val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+            df.write.format("json").saveAsTable("jt2")
+            sql("ALTER VIEW testView AS SELECT * FROM jt2")
+            // make sure the view has been changed.
+            checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+          }
+        }
+      }
+    }
+
+    test(s"$prefix create hive view for json table") {
+      // json table is not hive-compatible, make sure the new flag fix it.
+      withSQLConf(
+        SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+        withTable("jt") {
+          withView("testView") {
+            sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+            sql("CREATE VIEW testView AS SELECT id FROM jt")
+            checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+          }
+        }
+      }
+    }
+
+    test(s"$prefix create hive view for partitioned parquet table") {
+      // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
+      withSQLConf(
+        SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+        withTable("parTable") {
+          withView("testView") {
+            val df = Seq(1 -> "a").toDF("i", "j")
+            df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
+            sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
+            checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
+          }
+        }
+      }
+    }
+  }
+
+  test("CTE within view") {
+    withSQLConf(
+      SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+      withView("cte_view") {
+        sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w")
+        checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
+      }
+    }
+  }
+
+  test("Using view after switching current database") {
+    withSQLConf(
+      SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+      withView("v") {
+        sql("CREATE VIEW v AS SELECT * FROM src")
+        withTempDatabase { db =>
+          activateDatabase(db) {
+            // Should look up table `src` in database `default`.
+            checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
+
+            // The new `src` table shouldn't be scanned.
+            sql("CREATE TABLE src(key INT, value STRING)")
+            checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
+          }
+        }
+      }
+    }
+  }
+
+  test("Using view after adding more columns") {
+    withSQLConf(
+      SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+      withTable("add_col") {
+        sqlContext.range(10).write.saveAsTable("add_col")
+        withView("v") {
+          sql("CREATE VIEW v AS SELECT * FROM add_col")
+          sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
+          checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10).toDF())
+        }
+      }
+    }
+  }
+
+  test("create hive view for joined tables") {
+    // make sure the new flag can handle some complex cases like join and schema change.
+    withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
+      withTable("jt1", "jt2") {
+        sqlContext.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1")
+        sqlContext.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2")
+        sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2")
+        checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
+
+        val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol")
+        df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1")
+        checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
+
+        sql("DROP VIEW testView")
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/95faa731/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala
new file mode 100644
index 0000000..d0e7552
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+
+case class WindowData(month: Int, area: String, product: Int)
+
+
+/**
+ * Test suite for SQL window functions.
+ */
+class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+  import hiveContext.implicits._
+
+  test("window function: udaf with aggregate expression") {
+    val data = Seq(
+      WindowData(1, "a", 5),
+      WindowData(2, "a", 6),
+      WindowData(3, "b", 7),
+      WindowData(4, "b", 8),
+      WindowData(5, "c", 9),
+      WindowData(6, "c", 10)
+    )
+    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+    checkAnswer(
+      sql(
+        """
+          |select area, sum(product), sum(sum(product)) over (partition by area)
+          |from windowData group by month, area
+        """.stripMargin),
+      Seq(
+        ("a", 5, 11),
+        ("a", 6, 11),
+        ("b", 7, 15),
+        ("b", 8, 15),
+        ("c", 9, 19),
+        ("c", 10, 19)
+      ).map(i => Row(i._1, i._2, i._3)))
+
+    checkAnswer(
+      sql(
+        """
+          |select area, sum(product) - 1, sum(sum(product)) over (partition by area)
+          |from windowData group by month, area
+        """.stripMargin),
+      Seq(
+        ("a", 4, 11),
+        ("a", 5, 11),
+        ("b", 6, 15),
+        ("b", 7, 15),
+        ("c", 8, 19),
+        ("c", 9, 19)
+      ).map(i => Row(i._1, i._2, i._3)))
+
+    checkAnswer(
+      sql(
+        """
+          |select area, sum(product), sum(product) / sum(sum(product)) over (partition by area)
+          |from windowData group by month, area
+        """.stripMargin),
+      Seq(
+        ("a", 5, 5d/11),
+        ("a", 6, 6d/11),
+        ("b", 7, 7d/15),
+        ("b", 8, 8d/15),
+        ("c", 10, 10d/19),
+        ("c", 9, 9d/19)
+      ).map(i => Row(i._1, i._2, i._3)))
+
+    checkAnswer(
+      sql(
+        """
+          |select area, sum(product), sum(product) / sum(sum(product) - 1) over (partition by area)
+          |from windowData group by month, area
+        """.stripMargin),
+      Seq(
+        ("a", 5, 5d/9),
+        ("a", 6, 6d/9),
+        ("b", 7, 7d/13),
+        ("b", 8, 8d/13),
+        ("c", 10, 10d/17),
+        ("c", 9, 9d/17)
+      ).map(i => Row(i._1, i._2, i._3)))
+  }
+
+  test("window function: refer column in inner select block") {
+    val data = Seq(
+      WindowData(1, "a", 5),
+      WindowData(2, "a", 6),
+      WindowData(3, "b", 7),
+      WindowData(4, "b", 8),
+      WindowData(5, "c", 9),
+      WindowData(6, "c", 10)
+    )
+    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+    checkAnswer(
+      sql(
+        """
+          |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1
+          |from (select month, area, product, 1 as tmp1 from windowData) tmp
+        """.stripMargin),
+      Seq(
+        ("a", 2),
+        ("a", 3),
+        ("b", 2),
+        ("b", 3),
+        ("c", 2),
+        ("c", 3)
+      ).map(i => Row(i._1, i._2)))
+  }
+
+  test("window function: partition and order expressions") {
+    val data = Seq(
+      WindowData(1, "a", 5),
+      WindowData(2, "a", 6),
+      WindowData(3, "b", 7),
+      WindowData(4, "b", 8),
+      WindowData(5, "c", 9),
+      WindowData(6, "c", 10)
+    )
+    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+    checkAnswer(
+      sql(
+        """
+          |select month, area, product, sum(product + 1) over (partition by 1 order by 2)
+          |from windowData
+        """.stripMargin),
+      Seq(
+        (1, "a", 5, 51),
+        (2, "a", 6, 51),
+        (3, "b", 7, 51),
+        (4, "b", 8, 51),
+        (5, "c", 9, 51),
+        (6, "c", 10, 51)
+      ).map(i => Row(i._1, i._2, i._3, i._4)))
+
+    checkAnswer(
+      sql(
+        """
+          |select month, area, product, sum(product)
+          |over (partition by month % 2 order by 10 - product)
+          |from windowData
+        """.stripMargin),
+      Seq(
+        (1, "a", 5, 21),
+        (2, "a", 6, 24),
+        (3, "b", 7, 16),
+        (4, "b", 8, 18),
+        (5, "c", 9, 9),
+        (6, "c", 10, 10)
+      ).map(i => Row(i._1, i._2, i._3, i._4)))
+  }
+
+  test("window function: distinct should not be silently ignored") {
+    val data = Seq(
+      WindowData(1, "a", 5),
+      WindowData(2, "a", 6),
+      WindowData(3, "b", 7),
+      WindowData(4, "b", 8),
+      WindowData(5, "c", 9),
+      WindowData(6, "c", 10)
+    )
+    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+    val e = intercept[AnalysisException] {
+      sql(
+        """
+          |select month, area, product, sum(distinct product + 1) over (partition by 1 order by 2)
+          |from windowData
+        """.stripMargin)
+    }
+    assert(e.getMessage.contains("Distinct window functions are not supported"))
+  }
+
+  test("window function: expressions in arguments of a window functions") {
+    val data = Seq(
+      WindowData(1, "a", 5),
+      WindowData(2, "a", 6),
+      WindowData(3, "b", 7),
+      WindowData(4, "b", 8),
+      WindowData(5, "c", 9),
+      WindowData(6, "c", 10)
+    )
+    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+    checkAnswer(
+      sql(
+        """
+          |select month, area, month % 2,
+          |lag(product, 1 + 1, product) over (partition by month % 2 order by area)
+          |from windowData
+        """.stripMargin),
+      Seq(
+        (1, "a", 1, 5),
+        (2, "a", 0, 6),
+        (3, "b", 1, 7),
+        (4, "b", 0, 8),
+        (5, "c", 1, 5),
+        (6, "c", 0, 6)
+      ).map(i => Row(i._1, i._2, i._3, i._4)))
+  }
+
+
+  test("window function: Sorting columns are not in Project") {
+    val data = Seq(
+      WindowData(1, "d", 10),
+      WindowData(2, "a", 6),
+      WindowData(3, "b", 7),
+      WindowData(4, "b", 8),
+      WindowData(5, "c", 9),
+      WindowData(6, "c", 11)
+    )
+    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+    checkAnswer(
+      sql("select month, product, sum(product + 1) over() from windowData order by area"),
+      Seq(
+        (2, 6, 57),
+        (3, 7, 57),
+        (4, 8, 57),
+        (5, 9, 57),
+        (6, 11, 57),
+        (1, 10, 57)
+      ).map(i => Row(i._1, i._2, i._3)))
+
+    checkAnswer(
+      sql(
+        """
+          |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1
+          |from (select month, area, product as p, 1 as tmp1 from windowData) tmp order by p
+        """.stripMargin),
+      Seq(
+        ("a", 2),
+        ("b", 2),
+        ("b", 3),
+        ("c", 2),
+        ("d", 2),
+        ("c", 3)
+      ).map(i => Row(i._1, i._2)))
+
+    checkAnswer(
+      sql(
+        """
+          |select area, rank() over (partition by area order by month) as c1
+          |from windowData group by product, area, month order by product, area
+        """.stripMargin),
+      Seq(
+        ("a", 1),
+        ("b", 1),
+        ("b", 2),
+        ("c", 1),
+        ("d", 1),
+        ("c", 2)
+      ).map(i => Row(i._1, i._2)))
+
+    checkAnswer(
+      sql(
+        """
+          |select area, sum(product) / sum(sum(product)) over (partition by area) as c1
+          |from windowData group by area, month order by month, c1
+        """.stripMargin),
+      Seq(
+        ("d", 1.0),
+        ("a", 1.0),
+        ("b", 0.4666666666666667),
+        ("b", 0.5333333333333333),
+        ("c", 0.45),
+        ("c", 0.55)
+      ).map(i => Row(i._1, i._2)))
+  }
+
+  // todo: fix this test case by reimplementing the function ResolveAggregateFunctions
+  ignore("window function: Pushing aggregate Expressions in Sort to Aggregate") {
+    val data = Seq(
+      WindowData(1, "d", 10),
+      WindowData(2, "a", 6),
+      WindowData(3, "b", 7),
+      WindowData(4, "b", 8),
+      WindowData(5, "c", 9),
+      WindowData(6, "c", 11)
+    )
+    sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+    checkAnswer(
+      sql(
+        """
+          |select area, sum(product) over () as c from windowData
+          |where product > 3 group by area, product
+          |having avg(month) > 0 order by avg(month), product
+        """.stripMargin),
+      Seq(
+        ("a", 51),
+        ("b", 51),
+        ("b", 51),
+        ("c", 51),
+        ("c", 51),
+        ("d", 51)
+      ).map(i => Row(i._1, i._2)))
+  }
+
+  test("window function: multiple window expressions in a single expression") {
+    val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
+    nums.registerTempTable("nums")
+
+    val expected =
+      Row(1, 1, 1, 55, 1, 57) ::
+        Row(0, 2, 3, 55, 2, 60) ::
+        Row(1, 3, 6, 55, 4, 65) ::
+        Row(0, 4, 10, 55, 6, 71) ::
+        Row(1, 5, 15, 55, 9, 79) ::
+        Row(0, 6, 21, 55, 12, 88) ::
+        Row(1, 7, 28, 55, 16, 99) ::
+        Row(0, 8, 36, 55, 20, 111) ::
+        Row(1, 9, 45, 55, 25, 125) ::
+        Row(0, 10, 55, 55, 30, 140) :: Nil
+
+    val actual = sql(
+      """
+        |SELECT
+        |  y,
+        |  x,
+        |  sum(x) OVER w1 AS running_sum,
+        |  sum(x) OVER w2 AS total_sum,
+        |  sum(x) OVER w3 AS running_sum_per_y,
+        |  ((sum(x) OVER w1) + (sum(x) OVER w2) + (sum(x) OVER w3)) as combined2
+        |FROM nums
+        |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UnBOUNDED PRECEDiNG AND CuRRENT RoW),
+        |       w2 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOuNDED FoLLOWING),
+        |       w3 AS (PARTITION BY y ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+      """.stripMargin)
+
+    checkAnswer(actual, expected)
+
+    sqlContext.dropTempTable("nums")
+  }
+
+  test("SPARK-7595: Window will cause resolve failed with self join") {
+    sql("SELECT * FROM src") // Force loading of src table.
+
+    checkAnswer(sql(
+      """
+        |with
+        | v1 as (select key, count(value) over (partition by key) cnt_val from src),
+        | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key)
+        | select * from v2 order by key limit 1
+      """.stripMargin), Row(0, 3))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org