You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/09/02 14:31:18 UTC
spark git commit: [SPARK-16935][SQL] Verification of Function-related
ExternalCatalog APIs
Repository: spark
Updated Branches:
refs/heads/master 7ee24dac8 -> 247a4faf0
[SPARK-16935][SQL] Verification of Function-related ExternalCatalog APIs
### What changes were proposed in this pull request?
Function-related `HiveExternalCatalog` APIs do not have enough verification logics. After the PR, `HiveExternalCatalog` and `InMemoryCatalog` become consistent in the error handling.
For example, below is the exception we got when calling `renameFunction`.
```
15:13:40.369 WARN org.apache.hadoop.hive.metastore.ObjectStore: Failed to get database db1, returning NoSuchObjectException
15:13:40.377 WARN org.apache.hadoop.hive.metastore.ObjectStore: Failed to get database db2, returning NoSuchObjectException
15:13:40.739 ERROR DataNucleus.Datastore.Persist: Update of object "org.apache.hadoop.hive.metastore.model.MFunction205629e9" using statement "UPDATE FUNCS SET FUNC_NAME=? WHERE FUNC_ID=?" failed : org.apache.derby.shared.common.error.DerbySQLIntegrityConstraintViolationException: The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUEFUNCTION' defined on 'FUNCS'.
at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown Source)
```
### How was this patch tested?
Improved the existing test cases to check whether the messages are right.
Author: gatorsmile <ga...@gmail.com>
Closes #14521 from gatorsmile/functionChecking.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/247a4faf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/247a4faf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/247a4faf
Branch: refs/heads/master
Commit: 247a4faf06c1dd47a6543c56929cd0182a03e106
Parents: 7ee24da
Author: gatorsmile <ga...@gmail.com>
Authored: Fri Sep 2 22:31:01 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Sep 2 22:31:01 2016 +0800
----------------------------------------------------------------------
.../sql/catalyst/catalog/ExternalCatalog.scala | 14 ++++++++++++-
.../sql/catalyst/catalog/InMemoryCatalog.scala | 19 ++----------------
.../catalyst/catalog/ExternalCatalogSuite.scala | 21 ++++++++++----------
.../spark/sql/hive/HiveExternalCatalog.scala | 8 ++++++++
4 files changed, 34 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/247a4faf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 27e1810..df72baa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.catalog
-import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
/**
@@ -38,6 +38,18 @@ abstract class ExternalCatalog {
}
}
+ protected def requireFunctionExists(db: String, funcName: String): Unit = {
+ if (!functionExists(db, funcName)) {
+ throw new NoSuchFunctionException(db = db, func = funcName)
+ }
+ }
+
+ protected def requireFunctionNotExists(db: String, funcName: String): Unit = {
+ if (functionExists(db, funcName)) {
+ throw new FunctionAlreadyExistsException(db = db, func = funcName)
+ }
+ }
+
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/247a4faf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index b55ddcb..4e361a5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -63,18 +63,6 @@ class InMemoryCatalog(
catalog(db).tables(table).partitions.contains(spec)
}
- private def requireFunctionExists(db: String, funcName: String): Unit = {
- if (!functionExists(db, funcName)) {
- throw new NoSuchFunctionException(db = db, func = funcName)
- }
- }
-
- private def requireFunctionNotExists(db: String, funcName: String): Unit = {
- if (functionExists(db, funcName)) {
- throw new FunctionAlreadyExistsException(db = db, func = funcName)
- }
- }
-
private def requireTableExists(db: String, table: String): Unit = {
if (!tableExists(db, table)) {
throw new NoSuchTableException(db = db, table = table)
@@ -474,11 +462,8 @@ class InMemoryCatalog(
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
- if (functionExists(db, func.identifier.funcName)) {
- throw new FunctionAlreadyExistsException(db = db, func = func.identifier.funcName)
- } else {
- catalog(db).functions.put(func.identifier.funcName, func)
- }
+ requireFunctionNotExists(db, func.identifier.funcName)
+ catalog(db).functions.put(func.identifier.funcName, func)
}
override def dropFunction(db: String, funcName: String): Unit = synchronized {
http://git-wip-us.apache.org/repos/asf/spark/blob/247a4faf/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 19f8665..f283f42 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -450,14 +451,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("create function when database does not exist") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.createFunction("does_not_exist", newFunc())
}
}
test("create function that already exists") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] {
+ intercept[FunctionAlreadyExistsException] {
catalog.createFunction("db2", newFunc("func1"))
}
}
@@ -471,14 +472,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("drop function when database does not exist") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.dropFunction("does_not_exist", "something")
}
}
test("drop function that does not exist") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] {
+ intercept[NoSuchFunctionException] {
catalog.dropFunction("db2", "does_not_exist")
}
}
@@ -488,14 +489,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(catalog.getFunction("db2", "func1") ==
CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
Seq.empty[FunctionResource]))
- intercept[AnalysisException] {
+ intercept[NoSuchFunctionException] {
catalog.getFunction("db2", "does_not_exist")
}
}
test("get function when database does not exist") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.getFunction("does_not_exist", "func1")
}
}
@@ -505,15 +506,15 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val newName = "funcky"
assert(catalog.getFunction("db2", "func1").className == funcClass)
catalog.renameFunction("db2", "func1", newName)
- intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
+ intercept[NoSuchFunctionException] { catalog.getFunction("db2", "func1") }
assert(catalog.getFunction("db2", newName).identifier.funcName == newName)
assert(catalog.getFunction("db2", newName).className == funcClass)
- intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
+ intercept[NoSuchFunctionException] { catalog.renameFunction("db2", "does_not_exist", "me") }
}
test("rename function when database does not exist") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.renameFunction("does_not_exist", "func1", "func5")
}
}
@@ -521,7 +522,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("rename function when new function already exists") {
val catalog = newBasicCatalog()
catalog.createFunction("db2", newFunc("func2", Some("db2")))
- intercept[AnalysisException] {
+ intercept[FunctionAlreadyExistsException] {
catalog.renameFunction("db2", "func1", "func2")
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/247a4faf/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index ed87ac3..8541ae2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -570,31 +570,39 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def createFunction(
db: String,
funcDefinition: CatalogFunction): Unit = withClient {
+ requireDbExists(db)
// Hive's metastore is case insensitive. However, Hive's createFunction does
// not normalize the function name (unlike the getFunction part). So,
// we are normalizing the function name.
val functionName = funcDefinition.identifier.funcName.toLowerCase
+ requireFunctionNotExists(db, functionName)
val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName)
client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
}
override def dropFunction(db: String, name: String): Unit = withClient {
+ requireFunctionExists(db, name)
client.dropFunction(db, name)
}
override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
+ requireFunctionExists(db, oldName)
+ requireFunctionNotExists(db, newName)
client.renameFunction(db, oldName, newName)
}
override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
+ requireFunctionExists(db, funcName)
client.getFunction(db, funcName)
}
override def functionExists(db: String, funcName: String): Boolean = withClient {
+ requireDbExists(db)
client.functionExists(db, funcName)
}
override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
+ requireDbExists(db)
client.listFunctions(db, pattern)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org