You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/23 21:34:28 UTC
[2/2] spark git commit: [SPARK-14014][SQL] Replace existing catalog
with SessionCatalog
[SPARK-14014][SQL] Replace existing catalog with SessionCatalog
## What changes were proposed in this pull request?
`SessionCatalog`, introduced in #11750, is a catalog that keeps track of temporary functions and tables, and delegates metastore operations to `ExternalCatalog`. This functionality overlaps a lot with the existing `analysis.Catalog`.
As of this commit, `SessionCatalog` and `ExternalCatalog` will no longer be dead code. There are still things that need to be done after this patch, namely:
- SPARK-14013: Properly implement temporary functions in `SessionCatalog`
- SPARK-13879: Decide which DDL/DML commands to support natively in Spark
- SPARK-?????: Implement the ones we do want to support through `SessionCatalog`.
- SPARK-?????: Merge SQL/HiveContext
## How was this patch tested?
This is largely a refactoring task so there are no new tests introduced. The particularly relevant tests are `SessionCatalogSuite` and `ExternalCatalogSuite`.
Author: Andrew Or <an...@databricks.com>
Author: Yin Huai <yh...@databricks.com>
Closes #11836 from andrewor14/use-session-catalog.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5dfc0197
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5dfc0197
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5dfc0197
Branch: refs/heads/master
Commit: 5dfc01976bb0d72489620b4f32cc12d620bb6260
Parents: 6bc4be6
Author: Andrew Or <an...@databricks.com>
Authored: Wed Mar 23 13:34:22 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Mar 23 13:34:22 2016 -0700
----------------------------------------------------------------------
R/pkg/inst/tests/testthat/test_sparkSQL.R | 3 +-
project/MimaExcludes.scala | 3 +
python/pyspark/sql/context.py | 2 +-
.../spark/sql/catalyst/analysis/Analyzer.scala | 20 +-
.../spark/sql/catalyst/analysis/Catalog.scala | 218 --------
.../sql/catalyst/analysis/unresolved.scala | 2 +-
.../sql/catalyst/catalog/InMemoryCatalog.scala | 35 +-
.../sql/catalyst/catalog/SessionCatalog.scala | 123 +++--
.../spark/sql/catalyst/catalog/interface.scala | 2 +
.../sql/catalyst/analysis/AnalysisSuite.scala | 6 +-
.../sql/catalyst/analysis/AnalysisTest.scala | 23 +-
.../analysis/DecimalPrecisionSuite.scala | 25 +-
.../sql/catalyst/catalog/CatalogTestCases.scala | 3 +-
.../catalyst/catalog/SessionCatalogSuite.scala | 20 +-
.../optimizer/BooleanSimplificationSuite.scala | 11 +-
.../optimizer/EliminateSortsSuite.scala | 5 +-
.../scala/org/apache/spark/sql/SQLContext.scala | 73 ++-
.../spark/sql/execution/command/commands.scala | 8 +-
.../spark/sql/execution/datasources/ddl.scala | 24 +-
.../spark/sql/execution/datasources/rules.scala | 10 +-
.../spark/sql/internal/SessionState.scala | 7 +-
.../org/apache/spark/sql/ListTablesSuite.scala | 15 +-
.../org/apache/spark/sql/SQLContextSuite.scala | 9 +-
.../org/apache/spark/sql/SQLQuerySuite.scala | 22 +-
.../datasources/parquet/ParquetQuerySuite.scala | 6 +-
.../apache/spark/sql/test/SQLTestUtils.scala | 4 +-
.../hive/thriftserver/SparkSQLCLIDriver.scala | 3 +-
.../spark/sql/hive/thriftserver/CliSuite.scala | 5 +-
.../org/apache/spark/sql/hive/HiveCatalog.scala | 5 +-
.../org/apache/spark/sql/hive/HiveContext.scala | 498 ++++++++++---------
.../spark/sql/hive/HiveMetastoreCatalog.scala | 60 +--
.../spark/sql/hive/HiveSessionCatalog.scala | 104 ++++
.../spark/sql/hive/HiveSessionState.scala | 10 +-
.../spark/sql/hive/client/HiveClient.scala | 3 -
.../spark/sql/hive/client/HiveClientImpl.scala | 4 -
.../hive/execution/CreateTableAsSelect.scala | 4 +-
.../sql/hive/execution/CreateViewAsSelect.scala | 4 +-
.../hive/execution/InsertIntoHiveTable.scala | 14 +-
.../spark/sql/hive/execution/commands.scala | 9 +-
.../apache/spark/sql/hive/test/TestHive.scala | 151 ++++--
.../sql/hive/JavaMetastoreDataSourcesSuite.java | 5 +-
.../spark/sql/hive/HiveContextSuite.scala | 38 ++
.../sql/hive/HiveMetastoreCatalogSuite.scala | 9 +-
.../apache/spark/sql/hive/ListTablesSuite.scala | 6 +-
.../sql/hive/MetastoreDataSourcesSuite.scala | 31 +-
.../spark/sql/hive/MultiDatabaseSuite.scala | 5 +-
.../apache/spark/sql/hive/StatisticsSuite.scala | 3 +-
.../spark/sql/hive/client/VersionsSuite.scala | 4 -
.../sql/hive/execution/HiveQuerySuite.scala | 16 +-
.../sql/hive/execution/SQLQuerySuite.scala | 4 +-
.../spark/sql/hive/orc/OrcQuerySuite.scala | 4 +-
.../apache/spark/sql/hive/parquetSuites.scala | 24 +-
52 files changed, 919 insertions(+), 783 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 63acbad..eef365b 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1817,7 +1817,8 @@ test_that("approxQuantile() on a DataFrame", {
test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
- expect_equal(grepl("Table not found: blah", retError), TRUE)
+ expect_equal(grepl("Table not found", retError), TRUE)
+ expect_equal(grepl("blah", retError), TRUE)
})
irisDF <- suppressWarnings(createDataFrame(sqlContext, iris))
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 42eafcb..9158983 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -563,6 +563,9 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerEvent.logEvent"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.OutputWriterFactory.newInstance")
) ++ Seq(
+ // [SPARK-14014] Replace existing analysis.Catalog with SessionCatalog
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.this")
+ ) ++ Seq(
// [SPARK-13928] Move org.apache.spark.Logging into org.apache.spark.internal.Logging
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Logging"),
(problem: Problem) => problem match {
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 9c2f6a3..4008332 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -554,7 +554,7 @@ class SQLContext(object):
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlContext.tableNames()
True
- >>> "table1" in sqlContext.tableNames("db")
+ >>> "table1" in sqlContext.tableNames("default")
True
"""
if dbName is None:
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 5951a70..178e940 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -36,23 +37,22 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.types._
/**
- * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
- * when all relations are already filled in and the analyzer needs only to resolve attribute
- * references.
+ * A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
+ * Used for testing when all relations are already filled in and the analyzer needs only
+ * to resolve attribute references.
*/
object SimpleAnalyzer
- extends Analyzer(
- EmptyCatalog,
- EmptyFunctionRegistry,
- new SimpleCatalystConf(caseSensitiveAnalysis = true))
+ extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true))
+class SimpleAnalyzer(conf: CatalystConf)
+ extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf)
/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
- * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
- * a [[FunctionRegistry]].
+ * [[UnresolvedRelation]]s into fully typed objects using information in a
+ * [[SessionCatalog]] and a [[FunctionRegistry]].
*/
class Analyzer(
- catalog: Catalog,
+ catalog: SessionCatalog,
registry: FunctionRegistry,
conf: CatalystConf,
maxIterations: Int = 100)
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
deleted file mode 100644
index 2f0a4db..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.analysis
-
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-
-
-/**
- * An interface for looking up relations by name. Used by an [[Analyzer]].
- */
-trait Catalog {
-
- val conf: CatalystConf
-
- def tableExists(tableIdent: TableIdentifier): Boolean
-
- def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan
-
- def setCurrentDatabase(databaseName: String): Unit = {
- throw new UnsupportedOperationException
- }
-
- /**
- * Returns tuples of (tableName, isTemporary) for all tables in the given database.
- * isTemporary is a Boolean value indicates if a table is a temporary or not.
- */
- def getTables(databaseName: Option[String]): Seq[(String, Boolean)]
-
- def refreshTable(tableIdent: TableIdentifier): Unit
-
- def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit
-
- def unregisterTable(tableIdent: TableIdentifier): Unit
-
- def unregisterAllTables(): Unit
-
- /**
- * Get the table name of TableIdentifier for temporary tables.
- */
- protected def getTableName(tableIdent: TableIdentifier): String = {
- // It is not allowed to specify database name for temporary tables.
- // We check it here and throw exception if database is defined.
- if (tableIdent.database.isDefined) {
- throw new AnalysisException("Specifying database name or other qualifiers are not allowed " +
- "for temporary tables. If the table name has dots (.) in it, please quote the " +
- "table name with backticks (`).")
- }
- if (conf.caseSensitiveAnalysis) {
- tableIdent.table
- } else {
- tableIdent.table.toLowerCase
- }
- }
-}
-
-class SimpleCatalog(val conf: CatalystConf) extends Catalog {
- private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]
-
- override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
- tables.put(getTableName(tableIdent), plan)
- }
-
- override def unregisterTable(tableIdent: TableIdentifier): Unit = {
- tables.remove(getTableName(tableIdent))
- }
-
- override def unregisterAllTables(): Unit = {
- tables.clear()
- }
-
- override def tableExists(tableIdent: TableIdentifier): Boolean = {
- tables.containsKey(getTableName(tableIdent))
- }
-
- override def lookupRelation(
- tableIdent: TableIdentifier,
- alias: Option[String] = None): LogicalPlan = {
- val tableName = getTableName(tableIdent)
- val table = tables.get(tableName)
- if (table == null) {
- throw new AnalysisException("Table not found: " + tableName)
- }
- val qualifiedTable = SubqueryAlias(tableName, table)
-
- // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
- // properly qualified with this alias.
- alias
- .map(a => SubqueryAlias(a, qualifiedTable))
- .getOrElse(qualifiedTable)
- }
-
- override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
- tables.keySet().asScala.map(_ -> true).toSeq
- }
-
- override def refreshTable(tableIdent: TableIdentifier): Unit = {
- throw new UnsupportedOperationException
- }
-}
-
-/**
- * A trait that can be mixed in with other Catalogs allowing specific tables to be overridden with
- * new logical plans. This can be used to bind query result to virtual tables, or replace tables
- * with in-memory cached versions. Note that the set of overrides is stored in memory and thus
- * lost when the JVM exits.
- */
-trait OverrideCatalog extends Catalog {
- private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan]
-
- private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = {
- if (tableIdent.database.isDefined) {
- None
- } else {
- Option(overrides.get(getTableName(tableIdent)))
- }
- }
-
- abstract override def tableExists(tableIdent: TableIdentifier): Boolean = {
- getOverriddenTable(tableIdent) match {
- case Some(_) => true
- case None => super.tableExists(tableIdent)
- }
- }
-
- abstract override def lookupRelation(
- tableIdent: TableIdentifier,
- alias: Option[String] = None): LogicalPlan = {
- getOverriddenTable(tableIdent) match {
- case Some(table) =>
- val tableName = getTableName(tableIdent)
- val qualifiedTable = SubqueryAlias(tableName, table)
-
- // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes
- // are properly qualified with this alias.
- alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
-
- case None => super.lookupRelation(tableIdent, alias)
- }
- }
-
- abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
- overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName)
- }
-
- override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
- overrides.put(getTableName(tableIdent), plan)
- }
-
- override def unregisterTable(tableIdent: TableIdentifier): Unit = {
- if (tableIdent.database.isEmpty) {
- overrides.remove(getTableName(tableIdent))
- }
- }
-
- override def unregisterAllTables(): Unit = {
- overrides.clear()
- }
-}
-
-/**
- * A trivial catalog that returns an error when a relation is requested. Used for testing when all
- * relations are already filled in and the analyzer needs only to resolve attribute references.
- */
-object EmptyCatalog extends Catalog {
-
- override val conf: CatalystConf = EmptyConf
-
- override def tableExists(tableIdent: TableIdentifier): Boolean = {
- throw new UnsupportedOperationException
- }
-
- override def lookupRelation(
- tableIdent: TableIdentifier,
- alias: Option[String] = None): LogicalPlan = {
- throw new UnsupportedOperationException
- }
-
- override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
- throw new UnsupportedOperationException
- }
-
- override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
- throw new UnsupportedOperationException
- }
-
- override def unregisterTable(tableIdent: TableIdentifier): Unit = {
- throw new UnsupportedOperationException
- }
-
- override def unregisterAllTables(): Unit = {
- throw new UnsupportedOperationException
- }
-
- override def refreshTable(tableIdent: TableIdentifier): Unit = {
- throw new UnsupportedOperationException
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 9518309..e73d367 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null)
/**
- * Holds the name of a relation that has yet to be looked up in a [[Catalog]].
+ * Holds the name of a relation that has yet to be looked up in a catalog.
*/
case class UnresolvedRelation(
tableIdentifier: TableIdentifier,
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 7ead1dd..e216fa5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -52,37 +52,34 @@ class InMemoryCatalog extends ExternalCatalog {
names.filter { funcName => regex.pattern.matcher(funcName).matches() }
}
- private def existsFunction(db: String, funcName: String): Boolean = {
+ private def functionExists(db: String, funcName: String): Boolean = {
requireDbExists(db)
catalog(db).functions.contains(funcName)
}
- private def existsTable(db: String, table: String): Boolean = {
- requireDbExists(db)
- catalog(db).tables.contains(table)
- }
-
- private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
+ private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
}
private def requireFunctionExists(db: String, funcName: String): Unit = {
- if (!existsFunction(db, funcName)) {
- throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'")
+ if (!functionExists(db, funcName)) {
+ throw new AnalysisException(
+ s"Function not found: '$funcName' does not exist in database '$db'")
}
}
private def requireTableExists(db: String, table: String): Unit = {
- if (!existsTable(db, table)) {
- throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
+ if (!tableExists(db, table)) {
+ throw new AnalysisException(
+ s"Table not found: '$table' does not exist in database '$db'")
}
}
private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
- if (!existsPartition(db, table, spec)) {
+ if (!partitionExists(db, table, spec)) {
throw new AnalysisException(
- s"Partition does not exist in database '$db' table '$table': '$spec'")
+ s"Partition not found: database '$db' table '$table' does not contain: '$spec'")
}
}
@@ -159,7 +156,7 @@ class InMemoryCatalog extends ExternalCatalog {
ignoreIfExists: Boolean): Unit = synchronized {
requireDbExists(db)
val table = tableDefinition.name.table
- if (existsTable(db, table)) {
+ if (tableExists(db, table)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
}
@@ -173,7 +170,7 @@ class InMemoryCatalog extends ExternalCatalog {
table: String,
ignoreIfNotExists: Boolean): Unit = synchronized {
requireDbExists(db)
- if (existsTable(db, table)) {
+ if (tableExists(db, table)) {
catalog(db).tables.remove(table)
} else {
if (!ignoreIfNotExists) {
@@ -200,13 +197,17 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).tables(table).table
}
+ override def tableExists(db: String, table: String): Boolean = synchronized {
+ requireDbExists(db)
+ catalog(db).tables.contains(table)
+ }
+
override def listTables(db: String): Seq[String] = synchronized {
requireDbExists(db)
catalog(db).tables.keySet.toSeq
}
override def listTables(db: String, pattern: String): Seq[String] = synchronized {
- requireDbExists(db)
filterPattern(listTables(db), pattern)
}
@@ -295,7 +296,7 @@ class InMemoryCatalog extends ExternalCatalog {
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
- if (existsFunction(db, func.name.funcName)) {
+ if (functionExists(db, func.name.funcName)) {
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
} else {
catalog(db).functions.put(func.name.funcName, func)
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 3ac2bcf..34265fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
@@ -31,17 +32,34 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
* proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
* tables and functions of the Spark Session that it belongs to.
*/
-class SessionCatalog(externalCatalog: ExternalCatalog) {
+class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
import ExternalCatalog._
- private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
- private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
+ def this(externalCatalog: ExternalCatalog) {
+ this(externalCatalog, new SimpleCatalystConf(true))
+ }
+
+ protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
+ protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
// Note: we track current database here because certain operations do not explicitly
// specify the database (e.g. DROP TABLE my_table). In these cases we must first
// check whether the temporary table or function exists, then, if not, operate on
// the corresponding item in the current database.
- private[this] var currentDb = "default"
+ protected[this] var currentDb = {
+ val defaultName = "default"
+ val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map())
+ // Initialize default database if it doesn't already exist
+ createDatabase(defaultDbDefinition, ignoreIfExists = true)
+ defaultName
+ }
+
+ /**
+ * Format table name, taking into account case sensitivity.
+ */
+ protected[this] def formatTableName(name: String): String = {
+ if (conf.caseSensitiveAnalysis) name else name.toLowerCase
+ }
// ----------------------------------------------------------------------------
// Databases
@@ -105,8 +123,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
*/
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
val db = tableDefinition.name.database.getOrElse(currentDb)
- val newTableDefinition = tableDefinition.copy(
- name = TableIdentifier(tableDefinition.name.table, Some(db)))
+ val table = formatTableName(tableDefinition.name.table)
+ val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
}
@@ -121,8 +139,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
*/
def alterTable(tableDefinition: CatalogTable): Unit = {
val db = tableDefinition.name.database.getOrElse(currentDb)
- val newTableDefinition = tableDefinition.copy(
- name = TableIdentifier(tableDefinition.name.table, Some(db)))
+ val table = formatTableName(tableDefinition.name.table)
+ val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
externalCatalog.alterTable(db, newTableDefinition)
}
@@ -132,7 +150,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
*/
def getTable(name: TableIdentifier): CatalogTable = {
val db = name.database.getOrElse(currentDb)
- externalCatalog.getTable(db, name.table)
+ val table = formatTableName(name.table)
+ externalCatalog.getTable(db, table)
}
// -------------------------------------------------------------
@@ -146,10 +165,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
name: String,
tableDefinition: LogicalPlan,
ignoreIfExists: Boolean): Unit = {
- if (tempTables.containsKey(name) && !ignoreIfExists) {
+ val table = formatTableName(name)
+ if (tempTables.containsKey(table) && !ignoreIfExists) {
throw new AnalysisException(s"Temporary table '$name' already exists.")
}
- tempTables.put(name, tableDefinition)
+ tempTables.put(table, tableDefinition)
}
/**
@@ -166,11 +186,13 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
throw new AnalysisException("rename does not support moving tables across databases")
}
val db = oldName.database.getOrElse(currentDb)
- if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) {
- externalCatalog.renameTable(db, oldName.table, newName.table)
+ val oldTableName = formatTableName(oldName.table)
+ val newTableName = formatTableName(newName.table)
+ if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) {
+ externalCatalog.renameTable(db, oldTableName, newTableName)
} else {
- val table = tempTables.remove(oldName.table)
- tempTables.put(newName.table, table)
+ val table = tempTables.remove(oldTableName)
+ tempTables.put(newTableName, table)
}
}
@@ -183,10 +205,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
*/
def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
val db = name.database.getOrElse(currentDb)
- if (name.database.isDefined || !tempTables.containsKey(name.table)) {
- externalCatalog.dropTable(db, name.table, ignoreIfNotExists)
+ val table = formatTableName(name.table)
+ if (name.database.isDefined || !tempTables.containsKey(table)) {
+ externalCatalog.dropTable(db, table, ignoreIfNotExists)
} else {
- tempTables.remove(name.table)
+ tempTables.remove(table)
}
}
@@ -199,29 +222,44 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
*/
def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
val db = name.database.getOrElse(currentDb)
+ val table = formatTableName(name.table)
val relation =
- if (name.database.isDefined || !tempTables.containsKey(name.table)) {
- val metadata = externalCatalog.getTable(db, name.table)
+ if (name.database.isDefined || !tempTables.containsKey(table)) {
+ val metadata = externalCatalog.getTable(db, table)
CatalogRelation(db, metadata, alias)
} else {
- tempTables.get(name.table)
+ tempTables.get(table)
}
- val qualifiedTable = SubqueryAlias(name.table, relation)
+ val qualifiedTable = SubqueryAlias(table, relation)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
// attributes are properly qualified with this alias.
alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
}
/**
- * List all tables in the specified database, including temporary tables.
+ * Return whether a table with the specified name exists.
+ *
+ * Note: If a database is explicitly specified, then this will return whether the table
+ * exists in that particular database instead. In that case, even if there is a temporary
+ * table with the same name, we will return false if the specified database does not
+ * contain the table.
*/
- def listTables(db: String): Seq[TableIdentifier] = {
- val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) }
- val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) }
- dbTables ++ _tempTables
+ def tableExists(name: TableIdentifier): Boolean = {
+ val db = name.database.getOrElse(currentDb)
+ val table = formatTableName(name.table)
+ if (name.database.isDefined || !tempTables.containsKey(table)) {
+ externalCatalog.tableExists(db, table)
+ } else {
+ true // it's a temporary table
+ }
}
/**
+ * List all tables in the specified database, including temporary tables.
+ */
+ def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*")
+
+ /**
* List all matching tables in the specified database, including temporary tables.
*/
def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
@@ -235,6 +273,19 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
}
/**
+ * Refresh the cache entry for a metastore table, if any.
+ */
+ def refreshTable(name: TableIdentifier): Unit = { /* no-op */ }
+
+ /**
+ * Drop all existing temporary tables.
+ * For testing only.
+ */
+ def clearTempTables(): Unit = {
+ tempTables.clear()
+ }
+
+ /**
* Return a temporary table exactly as it was stored.
* For testing only.
*/
@@ -263,7 +314,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val db = tableName.database.getOrElse(currentDb)
- externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists)
+ val table = formatTableName(tableName.table)
+ externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
}
/**
@@ -275,7 +327,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit = {
val db = tableName.database.getOrElse(currentDb)
- externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists)
+ val table = formatTableName(tableName.table)
+ externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists)
}
/**
@@ -289,7 +342,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
val db = tableName.database.getOrElse(currentDb)
- externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs)
+ val table = formatTableName(tableName.table)
+ externalCatalog.renamePartitions(db, table, specs, newSpecs)
}
/**
@@ -303,7 +357,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
*/
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
val db = tableName.database.getOrElse(currentDb)
- externalCatalog.alterPartitions(db, tableName.table, parts)
+ val table = formatTableName(tableName.table)
+ externalCatalog.alterPartitions(db, table, parts)
}
/**
@@ -312,7 +367,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
*/
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
val db = tableName.database.getOrElse(currentDb)
- externalCatalog.getPartition(db, tableName.table, spec)
+ val table = formatTableName(tableName.table)
+ externalCatalog.getPartition(db, table, spec)
}
/**
@@ -321,7 +377,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
*/
def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = {
val db = tableName.database.getOrElse(currentDb)
- externalCatalog.listPartitions(db, tableName.table)
+ val table = formatTableName(tableName.table)
+ externalCatalog.listPartitions(db, table)
}
// ----------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index c4e4961..3480313 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -91,6 +91,8 @@ abstract class ExternalCatalog {
def getTable(db: String, table: String): CatalogTable
+ def tableExists(db: String, table: String): Boolean
+
def listTables(db: String): Seq[String]
def listTables(db: String, pattern: String): Seq[String]
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 8b568b6..afc2f32 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -161,14 +161,10 @@ class AnalysisSuite extends AnalysisTest {
}
test("resolve relations") {
- assertAnalysisError(
- UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe"))
-
+ assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq())
checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
-
checkAnalysis(
UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false)
-
checkAnalysis(
UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 39166c4..6fa4bee 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -18,26 +18,21 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier}
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
trait AnalysisTest extends PlanTest {
- val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = {
- val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
- val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false)
+ protected val caseSensitiveAnalyzer = makeAnalyzer(caseSensitive = true)
+ protected val caseInsensitiveAnalyzer = makeAnalyzer(caseSensitive = false)
- val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
- val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
-
- caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
- caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
-
- new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) {
- override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
- } ->
- new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) {
+ private def makeAnalyzer(caseSensitive: Boolean): Analyzer = {
+ val conf = new SimpleCatalystConf(caseSensitive)
+ val catalog = new SessionCatalog(new InMemoryCatalog, conf)
+ catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true)
+ new Analyzer(catalog, EmptyFunctionRegistry, conf) {
override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index 9aa685e..3150186 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
import org.scalatest.BeforeAndAfter
-import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier}
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -30,11 +31,11 @@ import org.apache.spark.sql.types._
class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
- val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
- val catalog = new SimpleCatalog(conf)
- val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
+ private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
+ private val catalog = new SessionCatalog(new InMemoryCatalog, conf)
+ private val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
- val relation = LocalRelation(
+ private val relation = LocalRelation(
AttributeReference("i", IntegerType)(),
AttributeReference("d1", DecimalType(2, 1))(),
AttributeReference("d2", DecimalType(5, 2))(),
@@ -43,15 +44,15 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
AttributeReference("b", DoubleType)()
)
- val i: Expression = UnresolvedAttribute("i")
- val d1: Expression = UnresolvedAttribute("d1")
- val d2: Expression = UnresolvedAttribute("d2")
- val u: Expression = UnresolvedAttribute("u")
- val f: Expression = UnresolvedAttribute("f")
- val b: Expression = UnresolvedAttribute("b")
+ private val i: Expression = UnresolvedAttribute("i")
+ private val d1: Expression = UnresolvedAttribute("d1")
+ private val d2: Expression = UnresolvedAttribute("d2")
+ private val u: Expression = UnresolvedAttribute("u")
+ private val f: Expression = UnresolvedAttribute("f")
+ private val b: Expression = UnresolvedAttribute("b")
before {
- catalog.registerTable(TableIdentifier("table"), relation)
+ catalog.createTempTable("table", relation, ignoreIfExists = true)
}
private def checkType(expression: Expression, expectedType: DataType): Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index a1ea619..277c2d7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -225,13 +225,14 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
test("list tables without pattern") {
val catalog = newBasicCatalog()
+ intercept[AnalysisException] { catalog.listTables("unknown_db") }
assert(catalog.listTables("db1").toSet == Set.empty)
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
}
test("list tables with pattern") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] { catalog.listTables("unknown_db") }
+ intercept[AnalysisException] { catalog.listTables("unknown_db", "*") }
assert(catalog.listTables("db1", "*").toSet == Set.empty)
assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index e1973ee..74e995c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -397,6 +397,24 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias)
}
+ test("table exists") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2"))))
+ assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2"))))
+ assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
+ assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1"))))
+ assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
+ // If database is explicitly specified, do not check temporary tables
+ val tempTable = Range(1, 10, 1, 10, Seq())
+ catalog.createTempTable("tbl3", tempTable, ignoreIfExists = false)
+ assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
+ // If database is not explicitly specified, check the current database
+ catalog.setCurrentDatabase("db2")
+ assert(catalog.tableExists(TableIdentifier("tbl1")))
+ assert(catalog.tableExists(TableIdentifier("tbl2")))
+ assert(catalog.tableExists(TableIdentifier("tbl3")))
+ }
+
test("list tables without pattern") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10, Seq())
@@ -429,7 +447,7 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(catalog.listTables("db2", "*1").toSet ==
Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2"))))
intercept[AnalysisException] {
- catalog.listTables("unknown_db")
+ catalog.listTables("unknown_db", "*")
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index 2ab31ee..e2c76b7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
@@ -137,11 +138,11 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d))
}
- private val caseInsensitiveAnalyzer =
- new Analyzer(
- EmptyCatalog,
- EmptyFunctionRegistry,
- new SimpleCatalystConf(caseSensitiveAnalysis = false))
+ private val caseInsensitiveConf = new SimpleCatalystConf(false)
+ private val caseInsensitiveAnalyzer = new Analyzer(
+ new SessionCatalog(new InMemoryCatalog, caseInsensitiveConf),
+ EmptyFunctionRegistry,
+ caseInsensitiveConf)
test("(a && b) || (a && c) => a && (b || c) when case insensitive") {
val plan = caseInsensitiveAnalyzer.execute(
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index a4c8d1c..3824c67 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.SimpleCatalystConf
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
@@ -28,7 +29,7 @@ import org.apache.spark.sql.catalyst.rules._
class EliminateSortsSuite extends PlanTest {
val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false)
- val catalog = new SimpleCatalog(conf)
+ val catalog = new SessionCatalog(new InMemoryCatalog, conf)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
object Optimize extends RuleExecutor[LogicalPlan] {
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 853a74c..e413e77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -25,13 +25,14 @@ import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst._
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
@@ -65,13 +66,14 @@ class SQLContext private[sql](
@transient val sparkContext: SparkContext,
@transient protected[sql] val cacheManager: CacheManager,
@transient private[sql] val listener: SQLListener,
- val isRootContext: Boolean)
+ val isRootContext: Boolean,
+ @transient private[sql] val externalCatalog: ExternalCatalog)
extends Logging with Serializable {
self =>
- def this(sparkContext: SparkContext) = {
- this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true)
+ def this(sc: SparkContext) = {
+ this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog)
}
def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
@@ -109,7 +111,8 @@ class SQLContext private[sql](
sparkContext = sparkContext,
cacheManager = cacheManager,
listener = listener,
- isRootContext = false)
+ isRootContext = false,
+ externalCatalog = externalCatalog)
}
/**
@@ -186,6 +189,12 @@ class SQLContext private[sql](
*/
def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
+ // Extract `spark.sql.*` entries and put it in our SQLConf.
+ // Subclasses may additionally set these entries in other confs.
+ SQLContext.getSQLProperties(sparkContext.getConf).asScala.foreach { case (k, v) =>
+ setConf(k, v)
+ }
+
protected[sql] def parseSql(sql: String): LogicalPlan = sessionState.sqlParser.parsePlan(sql)
protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql))
@@ -199,30 +208,6 @@ class SQLContext private[sql](
sparkContext.addJar(path)
}
- {
- // We extract spark sql settings from SparkContext's conf and put them to
- // Spark SQL's conf.
- // First, we populate the SQLConf (conf). So, we can make sure that other values using
- // those settings in their construction can get the correct settings.
- // For example, metadataHive in HiveContext may need both spark.sql.hive.metastore.version
- // and spark.sql.hive.metastore.jars to get correctly constructed.
- val properties = new Properties
- sparkContext.getConf.getAll.foreach {
- case (key, value) if key.startsWith("spark.sql") => properties.setProperty(key, value)
- case _ =>
- }
- // We directly put those settings to conf to avoid of calling setConf, which may have
- // side-effects. For example, in HiveContext, setConf may cause executionHive and metadataHive
- // get constructed. If we call setConf directly, the constructed metadataHive may have
- // wrong settings, or the construction may fail.
- conf.setConf(properties)
- // After we have populated SQLConf, we call setConf to populate other confs in the subclass
- // (e.g. hiveconf in HiveContext).
- properties.asScala.foreach {
- case (key, value) => setConf(key, value)
- }
- }
-
/**
* :: Experimental ::
* A collection of methods that are considered experimental, but can be used to hook into
@@ -683,8 +668,10 @@ class SQLContext private[sql](
* only during the lifetime of this instance of SQLContext.
*/
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
- sessionState.catalog.registerTable(
- sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan)
+ sessionState.catalog.createTempTable(
+ sessionState.sqlParser.parseTableIdentifier(tableName).table,
+ df.logicalPlan,
+ ignoreIfExists = true)
}
/**
@@ -697,7 +684,7 @@ class SQLContext private[sql](
*/
def dropTempTable(tableName: String): Unit = {
cacheManager.tryUncacheQuery(table(tableName))
- sessionState.catalog.unregisterTable(TableIdentifier(tableName))
+ sessionState.catalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true)
}
/**
@@ -824,9 +811,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tableNames(): Array[String] = {
- sessionState.catalog.getTables(None).map {
- case (tableName, _) => tableName
- }.toArray
+ tableNames(sessionState.catalog.getCurrentDatabase)
}
/**
@@ -836,9 +821,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tableNames(databaseName: String): Array[String] = {
- sessionState.catalog.getTables(Some(databaseName)).map {
- case (tableName, _) => tableName
- }.toArray
+ sessionState.catalog.listTables(databaseName).map(_.table).toArray
}
@transient
@@ -1025,4 +1008,18 @@ object SQLContext {
}
sqlListener.get()
}
+
+ /**
+ * Extract `spark.sql.*` properties from the conf and return them as a [[Properties]].
+ */
+ private[sql] def getSQLProperties(sparkConf: SparkConf): Properties = {
+ val properties = new Properties
+ sparkConf.getAll.foreach { case (key, value) =>
+ if (key.startsWith("spark.sql")) {
+ properties.setProperty(key, value)
+ }
+ }
+ properties
+ }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 59c3ffc..964f0a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -339,10 +339,12 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma
override def run(sqlContext: SQLContext): Seq[Row] = {
// Since we need to return a Seq of rows, we will call getTables directly
// instead of calling tables in sqlContext.
- val rows = sqlContext.sessionState.catalog.getTables(databaseName).map {
- case (tableName, isTemporary) => Row(tableName, isTemporary)
+ val catalog = sqlContext.sessionState.catalog
+ val db = databaseName.getOrElse(catalog.getCurrentDatabase)
+ val rows = catalog.listTables(db).map { t =>
+ val isTemp = t.database.isEmpty
+ Row(t.table, isTemp)
}
-
rows
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 9e8e035..24923bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -93,15 +93,21 @@ case class CreateTempTableUsing(
provider: String,
options: Map[String, String]) extends RunnableCommand {
+ if (tableIdent.database.isDefined) {
+ throw new AnalysisException(
+ s"Temporary table '$tableIdent' should not have specified a database")
+ }
+
def run(sqlContext: SQLContext): Seq[Row] = {
val dataSource = DataSource(
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
options = options)
- sqlContext.sessionState.catalog.registerTable(
- tableIdent,
- Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
+ sqlContext.sessionState.catalog.createTempTable(
+ tableIdent.table,
+ Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
+ ignoreIfExists = true)
Seq.empty[Row]
}
@@ -115,6 +121,11 @@ case class CreateTempTableUsingAsSelect(
options: Map[String, String],
query: LogicalPlan) extends RunnableCommand {
+ if (tableIdent.database.isDefined) {
+ throw new AnalysisException(
+ s"Temporary table '$tableIdent' should not have specified a database")
+ }
+
override def run(sqlContext: SQLContext): Seq[Row] = {
val df = Dataset.ofRows(sqlContext, query)
val dataSource = DataSource(
@@ -124,9 +135,10 @@ case class CreateTempTableUsingAsSelect(
bucketSpec = None,
options = options)
val result = dataSource.write(mode, df)
- sqlContext.sessionState.catalog.registerTable(
- tableIdent,
- Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan)
+ sqlContext.sessionState.catalog.createTempTable(
+ tableIdent.table,
+ Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan,
+ ignoreIfExists = true)
Seq.empty[Row]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 63f0e4f..28ac458 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation}
/**
@@ -99,7 +101,9 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
/**
* A rule to do various checks before inserting into or writing to a data source table.
*/
-private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) {
+private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
+ extends (LogicalPlan => Unit) {
+
def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
def apply(plan: LogicalPlan): Unit = {
@@ -139,7 +143,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
PartitioningUtils.validatePartitionColumnDataTypes(
- r.schema, part.keySet.toSeq, catalog.conf.caseSensitiveAnalysis)
+ r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis)
// Get all input data source relations of the query.
val srcRelations = query.collect {
@@ -190,7 +194,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
PartitioningUtils.validatePartitionColumnDataTypes(
- c.child.schema, c.partitionColumns, catalog.conf.caseSensitiveAnalysis)
+ c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
for {
spec <- c.bucketSpec
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index e6be0ab..e5f02ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.internal
import org.apache.spark.sql.{ContinuousQueryManager, ExperimentalMethods, SQLContext, UDFRegistration}
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -45,7 +46,7 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* Internal catalog for managing table and database states.
*/
- lazy val catalog: Catalog = new SimpleCatalog(conf)
+ lazy val catalog = new SessionCatalog(ctx.externalCatalog, conf)
/**
* Internal catalog for managing functions registered by the user.
@@ -68,7 +69,7 @@ private[sql] class SessionState(ctx: SQLContext) {
DataSourceAnalysis ::
(if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
- override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog))
+ override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
index 2820e4f..bb54c52 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
@@ -33,7 +33,8 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
}
after {
- sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+ sqlContext.sessionState.catalog.dropTable(
+ TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
}
test("get all tables") {
@@ -45,20 +46,22 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
- sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+ sqlContext.sessionState.catalog.dropTable(
+ TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
- test("getting all Tables with a database name has no impact on returned table names") {
+ test("getting all tables with a database name has no impact on returned table names") {
checkAnswer(
- sqlContext.tables("DB").filter("tableName = 'ListTablesSuiteTable'"),
+ sqlContext.tables("default").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
checkAnswer(
- sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
+ sql("show TABLES in default").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
- sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+ sqlContext.sessionState.catalog.dropTable(
+ TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 2ad92b5..2f62ad4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
-class SQLContextSuite extends SparkFunSuite with SharedSparkContext{
+class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
object DummyRule extends Rule[LogicalPlan] {
def apply(p: LogicalPlan): LogicalPlan = p
@@ -78,4 +78,11 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext{
sqlContext.experimental.extraOptimizations = Seq(DummyRule)
assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule))
}
+
+ test("SQLContext can access `spark.sql.*` configs") {
+ sc.conf.set("spark.sql.with.or.without.you", "my love")
+ val sqlContext = new SQLContext(sc)
+ assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love")
+ }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 2733ae7..bd13474 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1397,12 +1397,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-4699 case sensitivity SQL query") {
- sqlContext.setConf(SQLConf.CASE_SENSITIVE, false)
- val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
- val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
- rdd.toDF().registerTempTable("testTable1")
- checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
- sqlContext.setConf(SQLConf.CASE_SENSITIVE, true)
+ val orig = sqlContext.getConf(SQLConf.CASE_SENSITIVE)
+ try {
+ sqlContext.setConf(SQLConf.CASE_SENSITIVE, false)
+ val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
+ val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
+ rdd.toDF().registerTempTable("testTable1")
+ checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
+ } finally {
+ sqlContext.setConf(SQLConf.CASE_SENSITIVE, orig)
+ }
}
test("SPARK-6145: ORDER BY test for nested fields") {
@@ -1676,7 +1680,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
.format("parquet")
.save(path)
- val message = intercept[AnalysisException] {
+ // We don't support creating a temporary table while specifying a database
+ intercept[AnalysisException] {
sqlContext.sql(
s"""
|CREATE TEMPORARY TABLE db.t
@@ -1686,9 +1691,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
|)
""".stripMargin)
}.getMessage
- assert(message.contains("Specifying database name or other qualifiers are not allowed"))
- // If you use backticks to quote the name of a temporary table having dot in it.
+ // If you use backticks to quote the name then it's OK.
sqlContext.sql(
s"""
|CREATE TEMPORARY TABLE `db.t`
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index f8166c7..2f806eb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -51,7 +51,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
}
- sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
+ sqlContext.sessionState.catalog.dropTable(
+ TableIdentifier("tmp"), ignoreIfNotExists = true)
}
test("overwriting") {
@@ -61,7 +62,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
}
- sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
+ sqlContext.sessionState.catalog.dropTable(
+ TableIdentifier("tmp"), ignoreIfNotExists = true)
}
test("self-join") {
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index d483585..80a85a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -189,8 +189,8 @@ private[sql] trait SQLTestUtils
* `f` returns.
*/
protected def activateDatabase(db: String)(f: => Unit): Unit = {
- sqlContext.sql(s"USE $db")
- try f finally sqlContext.sql(s"USE default")
+ sqlContext.sessionState.catalog.setCurrentDatabase(db)
+ try f finally sqlContext.sessionState.catalog.setCurrentDatabase("default")
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 7fe31b0..5769328 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -150,7 +150,8 @@ private[hive] object SparkSQLCLIDriver extends Logging {
}
if (sessionState.database != null) {
- SparkSQLEnv.hiveContext.runSqlHive(s"USE ${sessionState.database}")
+ SparkSQLEnv.hiveContext.sessionState.catalog.setCurrentDatabase(
+ s"${sessionState.database}")
}
// Execute -i init files (always in silent mode)
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 032965d..8e1ebe2 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -193,10 +193,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
)
runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "SHOW TABLES;"))(
- ""
- -> "OK",
- ""
- -> "hive_test"
+ "" -> "hive_test"
)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5dfc0197/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
index 491f2ae..0722fb0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
@@ -85,7 +85,6 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
withClient { getTable(db, table) }
}
-
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
@@ -182,6 +181,10 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
client.getTable(db, table)
}
+ override def tableExists(db: String, table: String): Boolean = withClient {
+ client.getTableOption(db, table).isDefined
+ }
+
override def listTables(db: String): Seq[String] = withClient {
requireDbExists(db)
client.listTables(db)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org