You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/09/02 14:31:18 UTC

spark git commit: [SPARK-16935][SQL] Verification of Function-related ExternalCatalog APIs

Repository: spark
Updated Branches:
  refs/heads/master 7ee24dac8 -> 247a4faf0


[SPARK-16935][SQL] Verification of Function-related ExternalCatalog APIs

### What changes were proposed in this pull request?
Function-related `HiveExternalCatalog` APIs do not have enough verification logics. After the PR, `HiveExternalCatalog` and `InMemoryCatalog` become consistent in the error handling.

For example, below is the exception we got when calling `renameFunction`.
```
15:13:40.369 WARN org.apache.hadoop.hive.metastore.ObjectStore: Failed to get database db1, returning NoSuchObjectException
15:13:40.377 WARN org.apache.hadoop.hive.metastore.ObjectStore: Failed to get database db2, returning NoSuchObjectException
15:13:40.739 ERROR DataNucleus.Datastore.Persist: Update of object "org.apache.hadoop.hive.metastore.model.MFunction205629e9" using statement "UPDATE FUNCS SET FUNC_NAME=? WHERE FUNC_ID=?" failed : org.apache.derby.shared.common.error.DerbySQLIntegrityConstraintViolationException: The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUEFUNCTION' defined on 'FUNCS'.
	at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
	at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
	at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source)
	at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown Source)
```

### How was this patch tested?
Improved the existing test cases to check whether the messages are right.

Author: gatorsmile <ga...@gmail.com>

Closes #14521 from gatorsmile/functionChecking.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/247a4faf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/247a4faf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/247a4faf

Branch: refs/heads/master
Commit: 247a4faf06c1dd47a6543c56929cd0182a03e106
Parents: 7ee24da
Author: gatorsmile <ga...@gmail.com>
Authored: Fri Sep 2 22:31:01 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Sep 2 22:31:01 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/ExternalCatalog.scala  | 14 ++++++++++++-
 .../sql/catalyst/catalog/InMemoryCatalog.scala  | 19 ++----------------
 .../catalyst/catalog/ExternalCatalogSuite.scala | 21 ++++++++++----------
 .../spark/sql/hive/HiveExternalCatalog.scala    |  8 ++++++++
 4 files changed, 34 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/247a4faf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 27e1810..df72baa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
-import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
 
 
 /**
@@ -38,6 +38,18 @@ abstract class ExternalCatalog {
     }
   }
 
+  protected def requireFunctionExists(db: String, funcName: String): Unit = {
+    if (!functionExists(db, funcName)) {
+      throw new NoSuchFunctionException(db = db, func = funcName)
+    }
+  }
+
+  protected def requireFunctionNotExists(db: String, funcName: String): Unit = {
+    if (functionExists(db, funcName)) {
+      throw new FunctionAlreadyExistsException(db = db, func = funcName)
+    }
+  }
+
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/247a4faf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index b55ddcb..4e361a5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -63,18 +63,6 @@ class InMemoryCatalog(
     catalog(db).tables(table).partitions.contains(spec)
   }
 
-  private def requireFunctionExists(db: String, funcName: String): Unit = {
-    if (!functionExists(db, funcName)) {
-      throw new NoSuchFunctionException(db = db, func = funcName)
-    }
-  }
-
-  private def requireFunctionNotExists(db: String, funcName: String): Unit = {
-    if (functionExists(db, funcName)) {
-      throw new FunctionAlreadyExistsException(db = db, func = funcName)
-    }
-  }
-
   private def requireTableExists(db: String, table: String): Unit = {
     if (!tableExists(db, table)) {
       throw new NoSuchTableException(db = db, table = table)
@@ -474,11 +462,8 @@ class InMemoryCatalog(
 
   override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
     requireDbExists(db)
-    if (functionExists(db, func.identifier.funcName)) {
-      throw new FunctionAlreadyExistsException(db = db, func = func.identifier.funcName)
-    } else {
-      catalog(db).functions.put(func.identifier.funcName, func)
-    }
+    requireFunctionNotExists(db, func.identifier.funcName)
+    catalog(db).functions.put(func.identifier.funcName, func)
   }
 
   override def dropFunction(db: String, funcName: String): Unit = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/247a4faf/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 19f8665..f283f42 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
@@ -450,14 +451,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
 
   test("create function when database does not exist") {
     val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
+    intercept[NoSuchDatabaseException] {
       catalog.createFunction("does_not_exist", newFunc())
     }
   }
 
   test("create function that already exists") {
     val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
+    intercept[FunctionAlreadyExistsException] {
       catalog.createFunction("db2", newFunc("func1"))
     }
   }
@@ -471,14 +472,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
 
   test("drop function when database does not exist") {
     val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
+    intercept[NoSuchDatabaseException] {
       catalog.dropFunction("does_not_exist", "something")
     }
   }
 
   test("drop function that does not exist") {
     val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
+    intercept[NoSuchFunctionException] {
       catalog.dropFunction("db2", "does_not_exist")
     }
   }
@@ -488,14 +489,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     assert(catalog.getFunction("db2", "func1") ==
       CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
         Seq.empty[FunctionResource]))
-    intercept[AnalysisException] {
+    intercept[NoSuchFunctionException] {
       catalog.getFunction("db2", "does_not_exist")
     }
   }
 
   test("get function when database does not exist") {
     val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
+    intercept[NoSuchDatabaseException] {
       catalog.getFunction("does_not_exist", "func1")
     }
   }
@@ -505,15 +506,15 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     val newName = "funcky"
     assert(catalog.getFunction("db2", "func1").className == funcClass)
     catalog.renameFunction("db2", "func1", newName)
-    intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
+    intercept[NoSuchFunctionException] { catalog.getFunction("db2", "func1") }
     assert(catalog.getFunction("db2", newName).identifier.funcName == newName)
     assert(catalog.getFunction("db2", newName).className == funcClass)
-    intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
+    intercept[NoSuchFunctionException] { catalog.renameFunction("db2", "does_not_exist", "me") }
   }
 
   test("rename function when database does not exist") {
     val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
+    intercept[NoSuchDatabaseException] {
       catalog.renameFunction("does_not_exist", "func1", "func5")
     }
   }
@@ -521,7 +522,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
   test("rename function when new function already exists") {
     val catalog = newBasicCatalog()
     catalog.createFunction("db2", newFunc("func2", Some("db2")))
-    intercept[AnalysisException] {
+    intercept[FunctionAlreadyExistsException] {
       catalog.renameFunction("db2", "func1", "func2")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/247a4faf/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index ed87ac3..8541ae2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -570,31 +570,39 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   override def createFunction(
       db: String,
       funcDefinition: CatalogFunction): Unit = withClient {
+    requireDbExists(db)
     // Hive's metastore is case insensitive. However, Hive's createFunction does
     // not normalize the function name (unlike the getFunction part). So,
     // we are normalizing the function name.
     val functionName = funcDefinition.identifier.funcName.toLowerCase
+    requireFunctionNotExists(db, functionName)
     val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName)
     client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
   }
 
   override def dropFunction(db: String, name: String): Unit = withClient {
+    requireFunctionExists(db, name)
     client.dropFunction(db, name)
   }
 
   override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
+    requireFunctionExists(db, oldName)
+    requireFunctionNotExists(db, newName)
     client.renameFunction(db, oldName, newName)
   }
 
   override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
+    requireFunctionExists(db, funcName)
     client.getFunction(db, funcName)
   }
 
   override def functionExists(db: String, funcName: String): Boolean = withClient {
+    requireDbExists(db)
     client.functionExists(db, funcName)
   }
 
   override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
+    requireDbExists(db)
     client.listFunctions(db, pattern)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org