You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/08 02:34:27 UTC

spark git commit: [SPARK-18217][SQL] Disallow creating permanent views based on temporary views or UDFs

Repository: spark
Updated Branches:
  refs/heads/master c1a0c66bd -> 1da64e1fa


[SPARK-18217][SQL] Disallow creating permanent views based on temporary views or UDFs

### What changes were proposed in this pull request?
Based on the discussion in [SPARK-18209](https://issues.apache.org/jira/browse/SPARK-18209). It doesn't really make sense to create permanent views based on temporary views or temporary UDFs.

To disallow the supports and issue the exceptions, this PR needs to detect whether a temporary view/UDF is being used when defining a permanent view. Basically, this PR can be split to two sub-tasks:

**Task 1:** detecting a temporary view from the query plan of view definition.
When finding an unresolved temporary view, Analyzer replaces it by a `SubqueryAlias` with the corresponding logical plan, which is stored in an in-memory HashMap. After replacement, it is impossible to detect whether the `SubqueryAlias` is added/generated from a temporary view. Thus, to detect the usage of a temporary view in view definition, this PR traverses the unresolved logical plan and uses the name of an `UnresolvedRelation` to detect whether it is a (global) temporary view.

**Task 2:** detecting a temporary UDF from the query plan of view definition.
Detecting usage of a temporary UDF in view definition is not straightfoward.

First, in the analyzed plan, we are having different forms to represent the functions. More importantly, some classes (e.g., `HiveGenericUDF`) are not accessible from `CreateViewCommand`, which is part of  `sql/core`. Thus, we used the unanalyzed plan `child` of `CreateViewCommand` to detect the usage of a temporary UDF. Because the plan has already been successfully analyzed, we can assume the functions have been defined/registered.

Second, in Spark, the functions have four forms: Spark built-in functions, built-in hash functions, permanent UDFs and temporary UDFs. We do not have any direct way to determine whether a function is temporary or not. Thus, we introduced a function `isTemporaryFunction` in `SessionCatalog`. This function contains the detailed logics to determine whether a function is temporary or not.

### How was this patch tested?
Added test cases.

Author: gatorsmile <ga...@gmail.com>

Closes #15764 from gatorsmile/blockTempFromPermViewCreation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1da64e1f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1da64e1f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1da64e1f

Branch: refs/heads/master
Commit: 1da64e1fa0970277d1fb47dec8adca47b068b1ec
Parents: c1a0c66
Author: gatorsmile <ga...@gmail.com>
Authored: Mon Nov 7 18:34:21 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Nov 7 18:34:21 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   | 18 ++++
 .../catalyst/catalog/SessionCatalogSuite.scala  | 28 ++++++
 .../spark/sql/execution/command/views.scala     | 38 +++++++-
 .../spark/sql/hive/HiveSessionCatalog.scala     |  1 +
 .../spark/sql/hive/execution/SQLViewSuite.scala | 99 ++++++++++++++++++--
 5 files changed, 172 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1da64e1f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 2d2120d..c8b61d8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -923,6 +923,24 @@ class SessionCatalog(
     }
   }
 
+  /**
+   * Returns whether it is a temporary function. If not existed, returns false.
+   */
+  def isTemporaryFunction(name: FunctionIdentifier): Boolean = {
+    // copied from HiveSessionCatalog
+    val hiveFunctions = Seq(
+      "hash",
+      "histogram_numeric",
+      "percentile")
+
+    // A temporary function is a function that has been registered in functionRegistry
+    // without a database name, and is neither a built-in function nor a Hive function
+    name.database.isEmpty &&
+      functionRegistry.functionExists(name.funcName) &&
+      !FunctionRegistry.builtin.functionExists(name.funcName) &&
+      !hiveFunctions.contains(name.funcName.toLowerCase)
+  }
+
   protected def failFunctionLookup(name: String): Nothing = {
     throw new NoSuchFunctionException(db = currentDb, func = name)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1da64e1f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index b77fef2..001d9c4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -919,6 +919,34 @@ class SessionCatalogSuite extends SparkFunSuite {
       catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(arguments.length))
   }
 
+  test("isTemporaryFunction") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+
+    // Returns false when the function does not exist
+    assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("temp1")))
+
+    val tempFunc1 = (e: Seq[Expression]) => e.head
+    val info1 = new ExpressionInfo("tempFunc1", "temp1")
+    sessionCatalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false)
+
+    // Returns true when the function is temporary
+    assert(sessionCatalog.isTemporaryFunction(FunctionIdentifier("temp1")))
+
+    // Returns false when the function is permanent
+    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
+    assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("func1", Some("db2"))))
+    assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("db2.func1")))
+    sessionCatalog.setCurrentDatabase("db2")
+    assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("func1")))
+
+    // Returns false when the function is built-in or hive
+    assert(FunctionRegistry.builtin.functionExists("sum"))
+    assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("sum")))
+    assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("histogram_numeric")))
+    assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("percentile")))
+  }
+
   test("drop function") {
     val externalCatalog = newBasicCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)

http://git-wip-us.apache.org/repos/asf/spark/blob/1da64e1f/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index bbcd9c4..30472ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.command
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
-import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+import org.apache.spark.sql.types.MetadataBuilder
 
 
 /**
@@ -131,6 +131,10 @@ case class CreateViewCommand(
         s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).")
     }
 
+    // When creating a permanent view, not allowed to reference temporary objects.
+    // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
+    verifyTemporaryObjectsNotExists(sparkSession)
+
     val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
       analyzedPlan
     } else {
@@ -173,6 +177,34 @@ case class CreateViewCommand(
   }
 
   /**
+   * Permanent views are not allowed to reference temp objects, including temp function and views
+   */
+  private def verifyTemporaryObjectsNotExists(sparkSession: SparkSession): Unit = {
+    if (!isTemporary) {
+      // This func traverses the unresolved plan `child`. Below are the reasons:
+      // 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding
+      // logical plan. After replacement, it is impossible to detect whether the SubqueryAlias is
+      // added/generated from a temporary view.
+      // 2) The temp functions are represented by multiple classes. Most are inaccessible from this
+      // package (e.g., HiveGenericUDF).
+      child.collect {
+        // Disallow creating permanent views based on temporary views.
+        case s: UnresolvedRelation
+          if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) =>
+          throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
+            s"referencing a temporary view ${s.tableIdentifier}")
+        case other if !other.resolved => other.expressions.flatMap(_.collect {
+          // Disallow creating permanent views based on temporary UDFs.
+          case e: UnresolvedFunction
+            if sparkSession.sessionState.catalog.isTemporaryFunction(e.name) =>
+            throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
+              s"referencing a temporary function `${e.name}`")
+        })
+      }
+    }
+  }
+
+  /**
    * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
    * SQL based on the analyzed plan, and also creates the proper schema for the view.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/1da64e1f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 9df20ce..4a9b28a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -232,6 +232,7 @@ private[sql] class HiveSessionCatalog(
   // current_user, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty, ewah_bitmap_or, field,
   // in_file, index, matchpath, ngrams, noop, noopstreaming, noopwithmap,
   // noopwithmapstreaming, parse_url_tuple, reflect2, windowingtablefunction.
+  // Note: don't forget to update SessionCatalog.isTemporaryFunction
   private val hiveFunctions = Seq(
     "histogram_numeric",
     "percentile"

http://git-wip-us.apache.org/repos/asf/spark/blob/1da64e1f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index 2af935d..ba65db7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -38,21 +38,46 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     spark.sql(s"DROP TABLE IF EXISTS jt")
   }
 
-  test("nested views (interleaved with temporary views)") {
-    withView("jtv1", "jtv2", "jtv3", "temp_jtv1", "temp_jtv2", "temp_jtv3") {
+  test("create a permanent view on a permanent view") {
+    withView("jtv1", "jtv2") {
       sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3")
       sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6")
       checkAnswer(sql("select count(*) FROM jtv2"), Row(2))
+    }
+  }
 
-      // Checks temporary views
+  test("create a temp view on a permanent view") {
+    withView("jtv1", "temp_jtv1") {
+      sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3")
+      sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jtv1 WHERE id < 6")
+      checkAnswer(sql("select count(*) FROM temp_jtv1"), Row(2))
+    }
+  }
+
+  test("create a temp view on a temp view") {
+    withView("temp_jtv1", "temp_jtv2") {
       sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3")
       sql("CREATE TEMPORARY VIEW temp_jtv2 AS SELECT * FROM temp_jtv1 WHERE id < 6")
       checkAnswer(sql("select count(*) FROM temp_jtv2"), Row(2))
+    }
+  }
+
+  test("create a permanent view on a temp view") {
+    withView("jtv1", "temp_jtv1", "global_temp_jtv1") {
+      sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3")
+      var e = intercept[AnalysisException] {
+        sql("CREATE VIEW jtv1 AS SELECT * FROM temp_jtv1 WHERE id < 6")
+      }.getMessage
+      assert(e.contains("Not allowed to create a permanent view `jtv1` by " +
+        "referencing a temporary view `temp_jtv1`"))
 
-      // Checks interleaved temporary view and normal view
-      sql("CREATE TEMPORARY VIEW temp_jtv3 AS SELECT * FROM jt WHERE id > 3")
-      sql("CREATE VIEW jtv3 AS SELECT * FROM temp_jtv3 WHERE id < 6")
-      checkAnswer(sql("select count(*) FROM jtv3"), Row(2))
+      val globalTempDB = spark.sharedState.globalTempViewManager.database
+      sql("CREATE GLOBAL TEMP VIEW global_temp_jtv1 AS SELECT * FROM jt WHERE id > 0")
+      e = intercept[AnalysisException] {
+        sql(s"CREATE VIEW jtv1 AS SELECT * FROM $globalTempDB.global_temp_jtv1 WHERE id < 6")
+      }.getMessage
+      assert(e.contains(s"Not allowed to create a permanent view `jtv1` by referencing " +
+        s"a temporary view `global_temp`.`global_temp_jtv1`"))
     }
   }
 
@@ -439,7 +464,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
-  test("SPARK-14933 - create view from hive parquet tabale") {
+  test("SPARK-14933 - create view from hive parquet table") {
     withTable("t_part") {
       withView("v_part") {
         spark.sql("create table t_part stored as parquet as select 1 as a, 2 as b")
@@ -451,7 +476,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
-  test("SPARK-14933 - create view from hive orc tabale") {
+  test("SPARK-14933 - create view from hive orc table") {
     withTable("t_orc") {
       withView("v_orc") {
         spark.sql("create table t_orc stored as orc as select 1 as a, 2 as b")
@@ -462,4 +487,60 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       }
     }
   }
+
+  test("create a permanent/temp view using a hive, built-in, and permanent user function") {
+    val permanentFuncName = "myUpper"
+    val permanentFuncClass =
+      classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName
+    val builtInFuncNameInLowerCase = "abs"
+    val builtInFuncNameInMixedCase = "aBs"
+    val hiveFuncName = "histogram_numeric"
+
+    withUserDefinedFunction(permanentFuncName -> false) {
+      sql(s"CREATE FUNCTION $permanentFuncName AS '$permanentFuncClass'")
+      withTable("tab1") {
+        (1 to 10).map(i => (s"$i", i)).toDF("str", "id").write.saveAsTable("tab1")
+        Seq("VIEW", "TEMPORARY VIEW").foreach { viewMode =>
+          withView("view1") {
+            sql(
+              s"""
+                 |CREATE $viewMode view1
+                 |AS SELECT
+                 |$permanentFuncName(str),
+                 |$builtInFuncNameInLowerCase(id),
+                 |$builtInFuncNameInMixedCase(id) as aBs,
+                 |$hiveFuncName(id, 5) over()
+                 |FROM tab1
+               """.stripMargin)
+            checkAnswer(sql("select count(*) FROM view1"), Row(10))
+          }
+        }
+      }
+    }
+  }
+
+  test("create a permanent/temp view using a temporary function") {
+    val tempFunctionName = "temp"
+    val functionClass =
+      classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName
+    withUserDefinedFunction(tempFunctionName -> true) {
+      sql(s"CREATE TEMPORARY FUNCTION $tempFunctionName AS '$functionClass'")
+      withView("view1", "tempView1") {
+        withTable("tab1") {
+          (1 to 10).map(i => s"$i").toDF("id").write.saveAsTable("tab1")
+
+          // temporary view
+          sql(s"CREATE TEMPORARY VIEW tempView1 AS SELECT $tempFunctionName(id) from tab1")
+          checkAnswer(sql("select count(*) FROM tempView1"), Row(10))
+
+          // permanent view
+          val e = intercept[AnalysisException] {
+            sql(s"CREATE VIEW view1 AS SELECT $tempFunctionName(id) from tab1")
+          }.getMessage
+          assert(e.contains("Not allowed to create a permanent view `view1` by referencing " +
+            s"a temporary function `$tempFunctionName`"))
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org