You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/09/30 00:56:35 UTC

spark git commit: [SPARK-17717][SQL] Add exist/find methods to Catalog.

Repository: spark
Updated Branches:
  refs/heads/master 2f7395670 -> 74ac1c438


[SPARK-17717][SQL] Add exist/find methods to Catalog.

## What changes were proposed in this pull request?
The current user facing catalog does not implement methods for checking object existence or finding objects. You could theoretically do this using the `list*` commands, but this is rather cumbersome and can actually be costly when there are many objects. This PR adds `exists*` and `find*` methods for Databases, Table and Functions.

## How was this patch tested?
Added tests to `org.apache.spark.sql.internal.CatalogSuite`

Author: Herman van Hovell <hv...@databricks.com>

Closes #15301 from hvanhovell/SPARK-17717.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74ac1c43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74ac1c43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74ac1c43

Branch: refs/heads/master
Commit: 74ac1c43817c0b8da70342e540ec7638dd7d01bd
Parents: 2f73956
Author: Herman van Hovell <hv...@databricks.com>
Authored: Thu Sep 29 17:56:32 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Sep 29 17:56:32 2016 -0700

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |  11 +-
 .../org/apache/spark/sql/catalog/Catalog.scala  |  83 ++++++++++
 .../apache/spark/sql/internal/CatalogImpl.scala | 152 ++++++++++++++++---
 .../spark/sql/internal/CatalogSuite.scala       | 118 ++++++++++++++
 4 files changed, 339 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/74ac1c43/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4db3edb..2ffe0ac 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -46,7 +46,16 @@ object MimaExcludes {
       // [SPARK-16967] Move Mesos to Module
       ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"),
       // [SPARK-16240] ML persistence backward compatibility for LDA
-      ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$")
+      ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$"),
+      // [SPARK-17717] Add Find and Exists method to Catalog.
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findDatabase"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findTable"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findFunction"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findColumn"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.columnExists")
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/74ac1c43/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 1aed245..b439022 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -102,6 +102,89 @@ abstract class Catalog {
   def listColumns(dbName: String, tableName: String): Dataset[Column]
 
   /**
+   * Find the database with the specified name. This throws an AnalysisException when the database
+   * cannot be found.
+   *
+   * @since 2.1.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def findDatabase(dbName: String): Database
+
+  /**
+   * Find the table with the specified name. This table can be a temporary table or a table in the
+   * current database. This throws an AnalysisException when the table cannot be found.
+   *
+   * @since 2.1.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def findTable(tableName: String): Table
+
+  /**
+   * Find the table with the specified name in the specified database. This throws an
+   * AnalysisException when the table cannot be found.
+   *
+   * @since 2.1.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def findTable(dbName: String, tableName: String): Table
+
+  /**
+   * Find the function with the specified name. This function can be a temporary function or a
+   * function in the current database. This throws an AnalysisException when the function cannot
+   * be found.
+   *
+   * @since 2.1.0
+   */
+  @throws[AnalysisException]("function does not exist")
+  def findFunction(functionName: String): Function
+
+  /**
+   * Find the function with the specified name. This throws an AnalysisException when the function
+   * cannot be found.
+   *
+   * @since 2.1.0
+   */
+  @throws[AnalysisException]("database or function does not exist")
+  def findFunction(dbName: String, functionName: String): Function
+
+  /**
+   * Check if the database with the specified name exists.
+   *
+   * @since 2.1.0
+   */
+  def databaseExists(dbName: String): Boolean
+
+  /**
+   * Check if the table with the specified name exists. This can either be a temporary table or a
+   * table in the current database.
+   *
+   * @since 2.1.0
+   */
+  def tableExists(tableName: String): Boolean
+
+  /**
+   * Check if the table with the specified name exists in the specified database.
+   *
+   * @since 2.1.0
+   */
+  def tableExists(dbName: String, tableName: String): Boolean
+
+  /**
+   * Check if the function with the specified name exists. This can either be a temporary function
+   * or a function in the current database.
+   *
+   * @since 2.1.0
+   */
+  def functionExists(functionName: String): Boolean
+
+  /**
+   * Check if the function with the specified name exists in the specified database.
+   *
+   * @since 2.1.0
+   */
+  def functionExists(dbName: String, functionName: String): Boolean
+
+  /**
    * :: Experimental ::
    * Creates an external table from the given path and returns the corresponding DataFrame.
    * It will use the default data source configured by spark.sql.sources.default.

http://git-wip-us.apache.org/repos/asf/spark/blob/74ac1c43/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index f252535..a1087ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -23,10 +23,10 @@ import scala.reflect.runtime.universe.TypeTag
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
-import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog}
+import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.types.StructType
 
@@ -69,15 +69,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    */
   override def listDatabases(): Dataset[Database] = {
     val databases = sessionCatalog.listDatabases().map { dbName =>
-      val metadata = sessionCatalog.getDatabaseMetadata(dbName)
-      new Database(
-        name = metadata.name,
-        description = metadata.description,
-        locationUri = metadata.locationUri)
+      makeDatabase(sessionCatalog.getDatabaseMetadata(dbName))
     }
     CatalogImpl.makeDataset(databases, sparkSession)
   }
 
+  private def makeDatabase(metadata: CatalogDatabase): Database = {
+    new Database(
+      name = metadata.name,
+      description = metadata.description,
+      locationUri = metadata.locationUri)
+  }
+
   /**
    * Returns a list of tables in the current database.
    * This includes all temporary tables.
@@ -94,18 +97,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
   override def listTables(dbName: String): Dataset[Table] = {
     requireDatabaseExists(dbName)
     val tables = sessionCatalog.listTables(dbName).map { tableIdent =>
-      val isTemp = tableIdent.database.isEmpty
-      val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent))
-      new Table(
-        name = tableIdent.identifier,
-        database = metadata.flatMap(_.identifier.database).orNull,
-        description = metadata.flatMap(_.comment).orNull,
-        tableType = metadata.map(_.tableType.name).getOrElse("TEMPORARY"),
-        isTemporary = isTemp)
+      makeTable(tableIdent, tableIdent.database.isEmpty)
     }
     CatalogImpl.makeDataset(tables, sparkSession)
   }
 
+  private def makeTable(tableIdent: TableIdentifier, isTemp: Boolean): Table = {
+    val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent))
+    new Table(
+      name = tableIdent.identifier,
+      database = metadata.flatMap(_.identifier.database).orNull,
+      description = metadata.flatMap(_.comment).orNull,
+      tableType = metadata.map(_.tableType.name).getOrElse("TEMPORARY"),
+      isTemporary = isTemp)
+  }
+
   /**
    * Returns a list of functions registered in the current database.
    * This includes all temporary functions
@@ -121,18 +127,22 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
   @throws[AnalysisException]("database does not exist")
   override def listFunctions(dbName: String): Dataset[Function] = {
     requireDatabaseExists(dbName)
-    val functions = sessionCatalog.listFunctions(dbName).map { case (funcIdent, _) =>
-      val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
-      new Function(
-        name = funcIdent.identifier,
-        database = funcIdent.database.orNull,
-        description = null, // for now, this is always undefined
-        className = metadata.getClassName,
-        isTemporary = funcIdent.database.isEmpty)
+    val functions = sessionCatalog.listFunctions(dbName).map { case (functIdent, _) =>
+      makeFunction(functIdent)
     }
     CatalogImpl.makeDataset(functions, sparkSession)
   }
 
+  private def makeFunction(funcIdent: FunctionIdentifier): Function = {
+    val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
+    new Function(
+      name = funcIdent.identifier,
+      database = funcIdent.database.orNull,
+      description = null, // for now, this is always undefined
+      className = metadata.getClassName,
+      isTemporary = funcIdent.database.isEmpty)
+  }
+
   /**
    * Returns a list of columns for the given table in the current database.
    */
@@ -168,6 +178,100 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
   }
 
   /**
+   * Find the database with the specified name. This throws an [[AnalysisException]] when no
+   * [[Database]] can be found.
+   */
+  override def findDatabase(dbName: String): Database = {
+    if (sessionCatalog.databaseExists(dbName)) {
+      makeDatabase(sessionCatalog.getDatabaseMetadata(dbName))
+    } else {
+      throw new AnalysisException(s"The specified database $dbName does not exist.")
+    }
+  }
+
+  /**
+   * Find the table with the specified name. This table can be a temporary table or a table in the
+   * current database. This throws an [[AnalysisException]] when no [[Table]] can be found.
+   */
+  override def findTable(tableName: String): Table = {
+    findTable(null, tableName)
+  }
+
+  /**
+   * Find the table with the specified name in the specified database. This throws an
+   * [[AnalysisException]] when no [[Table]] can be found.
+   */
+  override def findTable(dbName: String, tableName: String): Table = {
+    val tableIdent = TableIdentifier(tableName, Option(dbName))
+    val isTemporary = sessionCatalog.isTemporaryTable(tableIdent)
+    if (isTemporary || sessionCatalog.tableExists(tableIdent)) {
+      makeTable(tableIdent, isTemporary)
+    } else {
+      throw new AnalysisException(s"The specified table $tableIdent does not exist.")
+    }
+  }
+
+  /**
+   * Find the function with the specified name. This function can be a temporary function or a
+   * function in the current database. This throws an [[AnalysisException]] when no [[Function]]
+   * can be found.
+   */
+  override def findFunction(functionName: String): Function = {
+    findFunction(null, functionName)
+  }
+
+  /**
+   * Find the function with the specified name. This returns [[None]] when no [[Function]] can be
+   * found.
+   */
+  override def findFunction(dbName: String, functionName: String): Function = {
+    val functionIdent = FunctionIdentifier(functionName, Option(dbName))
+    if (sessionCatalog.functionExists(functionIdent)) {
+      makeFunction(functionIdent)
+    } else {
+      throw new AnalysisException(s"The specified function $functionIdent does not exist.")
+    }
+  }
+
+  /**
+   * Check if the database with the specified name exists.
+   */
+  override def databaseExists(dbName: String): Boolean = {
+    sessionCatalog.databaseExists(dbName)
+  }
+
+  /**
+   * Check if the table with the specified name exists. This can either be a temporary table or a
+   * table in the current database.
+   */
+  override def tableExists(tableName: String): Boolean = {
+    tableExists(null, tableName)
+  }
+
+  /**
+   * Check if the table with the specified name exists in the specified database.
+   */
+  override def tableExists(dbName: String, tableName: String): Boolean = {
+    val tableIdent = TableIdentifier(tableName, Option(dbName))
+    sessionCatalog.isTemporaryTable(tableIdent) || sessionCatalog.tableExists(tableIdent)
+  }
+
+  /**
+   * Check if the function with the specified name exists. This can either be a temporary function
+   * or a function in the current database.
+   */
+  override def functionExists(functionName: String): Boolean = {
+    functionExists(null, functionName)
+  }
+
+  /**
+   * Check if the function with the specified name exists in the specified database.
+   */
+  override def functionExists(dbName: String, functionName: String): Boolean = {
+    sessionCatalog.functionExists(FunctionIdentifier(functionName, Option(dbName)))
+  }
+
+  /**
    * :: Experimental ::
    * Creates an external table from the given path and returns the corresponding DataFrame.
    * It will use the default data source configured by spark.sql.sources.default.

http://git-wip-us.apache.org/repos/asf/spark/blob/74ac1c43/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 3dc67ff..783bf77 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -340,6 +340,124 @@ class CatalogSuite
     }
   }
 
+  test("find database") {
+    intercept[AnalysisException](spark.catalog.findDatabase("db10"))
+    withTempDatabase { db =>
+      assert(spark.catalog.findDatabase(db).name === db)
+    }
+  }
+
+  test("find table") {
+    withTempDatabase { db =>
+      withTable(s"tbl_x", s"$db.tbl_y") {
+        // Try to find non existing tables.
+        intercept[AnalysisException](spark.catalog.findTable("tbl_x"))
+        intercept[AnalysisException](spark.catalog.findTable("tbl_y"))
+        intercept[AnalysisException](spark.catalog.findTable(db, "tbl_y"))
+
+        // Create objects.
+        createTempTable("tbl_x")
+        createTable("tbl_y", Some(db))
+
+        // Find a temporary table
+        assert(spark.catalog.findTable("tbl_x").name === "tbl_x")
+
+        // Find a qualified table
+        assert(spark.catalog.findTable(db, "tbl_y").name === "tbl_y")
+
+        // Find an unqualified table using the current database
+        intercept[AnalysisException](spark.catalog.findTable("tbl_y"))
+        spark.catalog.setCurrentDatabase(db)
+        assert(spark.catalog.findTable("tbl_y").name === "tbl_y")
+      }
+    }
+  }
+
+  test("find function") {
+    withTempDatabase { db =>
+      withUserDefinedFunction("fn1" -> true, s"$db.fn2" -> false) {
+        // Try to find non existing functions.
+        intercept[AnalysisException](spark.catalog.findFunction("fn1"))
+        intercept[AnalysisException](spark.catalog.findFunction("fn2"))
+        intercept[AnalysisException](spark.catalog.findFunction(db, "fn2"))
+
+        // Create objects.
+        createTempFunction("fn1")
+        createFunction("fn2", Some(db))
+
+        // Find a temporary function
+        assert(spark.catalog.findFunction("fn1").name === "fn1")
+
+        // Find a qualified function
+        assert(spark.catalog.findFunction(db, "fn2").name === "fn2")
+
+        // Find an unqualified function using the current database
+        intercept[AnalysisException](spark.catalog.findFunction("fn2"))
+        spark.catalog.setCurrentDatabase(db)
+        assert(spark.catalog.findFunction("fn2").name === "fn2")
+      }
+    }
+  }
+
+  test("database exists") {
+    assert(!spark.catalog.databaseExists("db10"))
+    createDatabase("db10")
+    assert(spark.catalog.databaseExists("db10"))
+    dropDatabase("db10")
+  }
+
+  test("table exists") {
+    withTempDatabase { db =>
+      withTable(s"tbl_x", s"$db.tbl_y") {
+        // Try to find non existing tables.
+        assert(!spark.catalog.tableExists("tbl_x"))
+        assert(!spark.catalog.tableExists("tbl_y"))
+        assert(!spark.catalog.tableExists(db, "tbl_y"))
+
+        // Create objects.
+        createTempTable("tbl_x")
+        createTable("tbl_y", Some(db))
+
+        // Find a temporary table
+        assert(spark.catalog.tableExists("tbl_x"))
+
+        // Find a qualified table
+        assert(spark.catalog.tableExists(db, "tbl_y"))
+
+        // Find an unqualified table using the current database
+        assert(!spark.catalog.tableExists("tbl_y"))
+        spark.catalog.setCurrentDatabase(db)
+        assert(spark.catalog.tableExists("tbl_y"))
+      }
+    }
+  }
+
+  test("function exists") {
+    withTempDatabase { db =>
+      withUserDefinedFunction("fn1" -> true, s"$db.fn2" -> false) {
+        // Try to find non existing functions.
+        assert(!spark.catalog.functionExists("fn1"))
+        assert(!spark.catalog.functionExists("fn2"))
+        assert(!spark.catalog.functionExists(db, "fn2"))
+
+        // Create objects.
+        createTempFunction("fn1")
+        createFunction("fn2", Some(db))
+
+        // Find a temporary function
+        assert(spark.catalog.functionExists("fn1"))
+
+        // Find a qualified function
+        assert(spark.catalog.functionExists(db, "fn2"))
+
+        // Find an unqualified function using the current database
+        assert(!spark.catalog.functionExists("fn2"))
+        spark.catalog.setCurrentDatabase(db)
+        assert(spark.catalog.functionExists("fn2"))
+      }
+    }
+  }
+
   // TODO: add tests for the rest of them
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org