You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by andrewor14 <gi...@git.apache.org> on 2016/03/16 03:50:12 UTC

[GitHub] spark pull request: [SPARK-13923] Implement SessionCatalog

GitHub user andrewor14 opened a pull request:

    https://github.com/apache/spark/pull/11750

    [SPARK-13923] Implement SessionCatalog

    ## What changes were proposed in this pull request?
    
    As part of the effort to merge `SQLContext` and `HiveContext`, this patch implements an internal catalog called `SessionCatalog` that handles temporary functions and tables and delegates metastore operations to `ExternalCatalog`. Currently, this is still dead code, but in the future it will be part of `SessionState` and will replace `o.a.s.sql.catalyst.analysis.Catalog`.
    
    A recent patch #11573 parses Hive commands ourselves in Spark, but still passes the entire query text to Hive. In a future patch, we will use `SessionCatalog` to implement the parsed commands.
    
    ## How was this patch tested?
    
    700+ lines of tests in `SessionCatalogSuite`.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/andrewor14/spark temp-catalog

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11750.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11750
    
----
commit 4da28a57e65700b24887acb36330ca5d4ba5703b
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-11T00:35:35Z

    Merge in @yhuai's changes

commit d09178cc5407f21be595151b258416f9b8fb0b77
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-14T23:27:11Z

    Clean up table method signatures + add comments

commit 210b476f4c211652841fa791191db173c1f5a60f
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-15T00:14:59Z

    Do the same for functions and partitions

commit 0a25e34773a4b4ba077c3546923a3b12e61064d0
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-15T20:53:42Z

    Take into account current database in table methods

commit 19047521e6a95471641b13c8f3d64429b82e9c80
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-15T21:22:37Z

    Refactor CatalogTable to use TableIdentifier
    
    This is a standalone commit such that in the future we can split
    it out into a separate patch if preferrable.

commit db2755d150b737eae96f698fd3481f6f5f607c61
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-15T21:33:20Z

    Refactor CatalogFunction to use FunctionIdentifier

commit 132e7fc51750a18e1aaf7dc780df7199088a1704
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-15T21:38:50Z

    Fix tests

commit 3926738b393655188bc679030b55c14ed135014b
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-15T21:39:22Z

    Document and clean up function methods

commit 76d24ad8806fd409c8f099c38b0781cea6ba3eb2
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-15T22:32:38Z

    Implement SessionCatalog using ExternalCatalog

commit 8f6b67abd92c4866aaae3f742160b261239c50d7
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-15T22:33:30Z

    Fix style

commit cd606e2cff446aaecff6b6cb74382ff3af35b474
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-15T22:44:23Z

    Clean up duplicate code in Table/FunctionIdentifier

commit a5a4b0cd9b253f5297fc9c19568820c9626ea65e
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-15T23:12:55Z

    Refactor CatalogTestCases to make methods accessible

commit 1b905bb2bf3bb70bd438b467352a890c1568d712
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-15T23:59:38Z

    Fix infinite loop (woops)

commit a7951a9ab1ca9d008572b82431afab96232d5a58
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-16T01:52:30Z

    Implement tests for databases and tables

commit f33c8ef44fdd38ca9e5c35b421e1ddb8d93f1de6
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-16T02:04:03Z

    Implement tests for table partitions

commit 594390bcbfb6d806417dceedb2c24091950dd7c9
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-16T02:41:30Z

    Implement tests for functions

commit 3b2e48a4efa81bb886331ce5818e2d44c7c2f7da
Author: Andrew Or <an...@databricks.com>
Date:   2016-03-16T02:42:22Z

    Add TODO

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197625831
  
    **[Test build #2646 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2646/consoleFull)** for PR 11750 at commit [`ad43a5f`](https://github.com/apache/spark/commit/ad43a5ffdeeb881aaed8944971b63a27d1f4257f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56386952
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    +
    +  private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
    +
    +  // ----------------------------------------------------------------------------
    +  // Databases
    +  // ----------------------------------------------------------------------------
    +  // All methods in this category interact directly with the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
    +    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
    +  }
    +
    +  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
    +    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
    +  }
    +
    +  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
    +    externalCatalog.alterDatabase(dbDefinition)
    +  }
    +
    +  def getDatabase(db: String): CatalogDatabase = {
    +    externalCatalog.getDatabase(db)
    +  }
    +
    +  def databaseExists(db: String): Boolean = {
    +    externalCatalog.databaseExists(db)
    +  }
    +
    +  def listDatabases(): Seq[String] = {
    +    externalCatalog.listDatabases()
    +  }
    +
    +  def listDatabases(pattern: String): Seq[String] = {
    +    externalCatalog.listDatabases(pattern)
    +  }
    +
    +  // ----------------------------------------------------------------------------
    +  // Tables
    +  // ----------------------------------------------------------------------------
    +  // There are two kinds of tables, temporary tables and metastore tables.
    +  // Temporary tables are isolated across sessions and do not belong to any
    +  // particular database. Metastore tables can be used across multiple
    +  // sessions as their metadata is persisted in the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  // ----------------------------------------------------
    +  // | Methods that interact with metastore tables only |
    +  // ----------------------------------------------------
    +
    +  /**
    +   * Create a metastore table in the database specified in `tableDefinition`.
    +   * If no such database is specified, create it in the current database.
    +   */
    +  def createTable(
    +      currentDb: String,
    --- End diff --
    
    As discussed offline, let's move the tracking of current db into SessionCatalog.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56421080
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---
    @@ -167,7 +170,7 @@ abstract class ExternalCatalog {
      * @param name name of the function
      * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
      */
    -case class CatalogFunction(name: String, className: String)
    +case class CatalogFunction(name: FunctionIdentifier, className: String)
    --- End diff --
    
    `catalogFunc.name.funcName` is kind of weird (we do not need to change it right now). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56424468
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    +  private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
    +
    +  // Note: we track current database here because certain operations do not explicitly
    +  // specify the database (e.g. DROP TABLE my_table). In these cases we must first
    +  // check whether the temporary table or function exists, then, if not, operate on
    +  // the corresponding item in the current database.
    +  private[this] var currentDb = "default"
    +
    +  // ----------------------------------------------------------------------------
    +  // Databases
    +  // ----------------------------------------------------------------------------
    +  // All methods in this category interact directly with the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
    +    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
    +  }
    +
    +  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
    +    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
    +  }
    +
    +  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
    +    externalCatalog.alterDatabase(dbDefinition)
    +  }
    +
    +  def getDatabase(db: String): CatalogDatabase = {
    +    externalCatalog.getDatabase(db)
    +  }
    +
    +  def databaseExists(db: String): Boolean = {
    +    externalCatalog.databaseExists(db)
    +  }
    +
    +  def listDatabases(): Seq[String] = {
    +    externalCatalog.listDatabases()
    +  }
    +
    +  def listDatabases(pattern: String): Seq[String] = {
    +    externalCatalog.listDatabases(pattern)
    +  }
    +
    +  def getCurrentDatabase: String = currentDb
    +
    +  def setCurrentDatabase(db: String): Unit = {
    +    if (!databaseExists(db)) {
    +      throw new AnalysisException(s"cannot set current database to non-existent '$db'")
    +    }
    +    currentDb = db
    +  }
    +
    +  // ----------------------------------------------------------------------------
    +  // Tables
    +  // ----------------------------------------------------------------------------
    +  // There are two kinds of tables, temporary tables and metastore tables.
    +  // Temporary tables are isolated across sessions and do not belong to any
    +  // particular database. Metastore tables can be used across multiple
    +  // sessions as their metadata is persisted in the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  // ----------------------------------------------------
    +  // | Methods that interact with metastore tables only |
    +  // ----------------------------------------------------
    +
    +  /**
    +   * Create a metastore table in the database specified in `tableDefinition`.
    +   * If no such database is specified, create it in the current database.
    +   */
    +  def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
    +  }
    +
    +  /**
    +   * Alter the metadata of an existing metastore table identified by `tableDefinition`.
    +   *
    +   * If no database is specified in `tableDefinition`, assume the table is in the
    +   * current database.
    +   *
    +   * Note: If the underlying implementation does not support altering a certain field,
    +   * this becomes a no-op.
    +   */
    +  def alterTable(tableDefinition: CatalogTable): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.alterTable(db, newTableDefinition)
    +  }
    +
    +  /**
    +   * Retrieve the metadata of an existing metastore table.
    +   * If no database is specified, assume the table is in the current database.
    +   */
    +  def getTable(name: TableIdentifier): CatalogTable = {
    +    val db = name.database.getOrElse(currentDb)
    +    externalCatalog.getTable(db, name.table)
    +  }
    +
    +  // -------------------------------------------------------------
    +  // | Methods that interact with temporary and metastore tables |
    +  // -------------------------------------------------------------
    +
    +  /**
    +   * Create a temporary table.
    +   */
    +  def createTempTable(
    +      name: String,
    +      tableDefinition: LogicalPlan,
    +      ignoreIfExists: Boolean): Unit = {
    +    if (tempTables.containsKey(name) && !ignoreIfExists) {
    +      throw new AnalysisException(s"Temporary table '$name' already exists.")
    +    }
    +    tempTables.put(name, tableDefinition)
    +  }
    +
    +  /**
    +   * Rename a table.
    +   *
    +   * If a database is specified in `oldName`, this will rename the table in that database.
    +   * If no database is specified, this will first attempt to rename a temporary table with
    +   * the same name, then, if that does not exist, rename the table in the current database.
    +   *
    +   * This assumes the database specified in `oldName` matches the one specified in `newName`.
    +   */
    +  def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = {
    +    if (oldName.database != newName.database) {
    +      throw new AnalysisException("rename does not support moving tables across databases")
    +    }
    +    val db = oldName.database.getOrElse(currentDb)
    +    if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) {
    +      externalCatalog.renameTable(db, oldName.table, newName.table)
    +    } else {
    +      val table = tempTables.remove(oldName.table)
    +      tempTables.put(newName.table, table)
    +    }
    +  }
    +
    +  /**
    +   * Drop a table.
    +   *
    +   * If a database is specified in `name`, this will drop the table from that database.
    +   * If no database is specified, this will first attempt to drop a temporary table with
    +   * the same name, then, if that does not exist, drop the table from the current database.
    +   */
    +  def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
    --- End diff --
    
    Just a note at here. If we have a temp table called `T1` and we also have a metastore in the current db called `T1`, this dropTable command will drop temp table `T1`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56437967
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    --- End diff --
    
    Ok. This thing is not thread safe right now and making these concurrent is not sufficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56436672
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    +  private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
    +
    +  // Note: we track current database here because certain operations do not explicitly
    +  // specify the database (e.g. DROP TABLE my_table). In these cases we must first
    +  // check whether the temporary table or function exists, then, if not, operate on
    +  // the corresponding item in the current database.
    +  private[this] var currentDb = "default"
    +
    +  // ----------------------------------------------------------------------------
    +  // Databases
    +  // ----------------------------------------------------------------------------
    +  // All methods in this category interact directly with the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
    +    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
    +  }
    +
    +  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
    +    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
    +  }
    +
    +  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
    +    externalCatalog.alterDatabase(dbDefinition)
    +  }
    +
    +  def getDatabase(db: String): CatalogDatabase = {
    +    externalCatalog.getDatabase(db)
    +  }
    +
    +  def databaseExists(db: String): Boolean = {
    +    externalCatalog.databaseExists(db)
    +  }
    +
    +  def listDatabases(): Seq[String] = {
    +    externalCatalog.listDatabases()
    +  }
    +
    +  def listDatabases(pattern: String): Seq[String] = {
    +    externalCatalog.listDatabases(pattern)
    +  }
    +
    +  def getCurrentDatabase: String = currentDb
    +
    +  def setCurrentDatabase(db: String): Unit = {
    +    if (!databaseExists(db)) {
    +      throw new AnalysisException(s"cannot set current database to non-existent '$db'")
    +    }
    +    currentDb = db
    +  }
    +
    +  // ----------------------------------------------------------------------------
    +  // Tables
    +  // ----------------------------------------------------------------------------
    +  // There are two kinds of tables, temporary tables and metastore tables.
    +  // Temporary tables are isolated across sessions and do not belong to any
    +  // particular database. Metastore tables can be used across multiple
    +  // sessions as their metadata is persisted in the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  // ----------------------------------------------------
    +  // | Methods that interact with metastore tables only |
    +  // ----------------------------------------------------
    +
    +  /**
    +   * Create a metastore table in the database specified in `tableDefinition`.
    +   * If no such database is specified, create it in the current database.
    +   */
    +  def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
    +  }
    +
    +  /**
    +   * Alter the metadata of an existing metastore table identified by `tableDefinition`.
    +   *
    +   * If no database is specified in `tableDefinition`, assume the table is in the
    +   * current database.
    +   *
    +   * Note: If the underlying implementation does not support altering a certain field,
    +   * this becomes a no-op.
    +   */
    +  def alterTable(tableDefinition: CatalogTable): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.alterTable(db, newTableDefinition)
    +  }
    +
    +  /**
    +   * Retrieve the metadata of an existing metastore table.
    +   * If no database is specified, assume the table is in the current database.
    +   */
    +  def getTable(name: TableIdentifier): CatalogTable = {
    +    val db = name.database.getOrElse(currentDb)
    +    externalCatalog.getTable(db, name.table)
    +  }
    +
    +  // -------------------------------------------------------------
    +  // | Methods that interact with temporary and metastore tables |
    +  // -------------------------------------------------------------
    +
    +  /**
    +   * Create a temporary table.
    +   */
    +  def createTempTable(
    +      name: String,
    +      tableDefinition: LogicalPlan,
    +      ignoreIfExists: Boolean): Unit = {
    +    if (tempTables.containsKey(name) && !ignoreIfExists) {
    +      throw new AnalysisException(s"Temporary table '$name' already exists.")
    +    }
    +    tempTables.put(name, tableDefinition)
    --- End diff --
    
    this isn't really ignoreIfExists but updateIfExists?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197596671
  
    **[Test build #2646 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2646/consoleFull)** for PR 11750 at commit [`ad43a5f`](https://github.com/apache/spark/commit/ad43a5ffdeeb881aaed8944971b63a27d1f4257f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56394213
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    +
    +  private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
    +
    +  // ----------------------------------------------------------------------------
    +  // Databases
    +  // ----------------------------------------------------------------------------
    +  // All methods in this category interact directly with the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
    +    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
    +  }
    +
    +  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
    +    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
    +  }
    +
    +  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
    +    externalCatalog.alterDatabase(dbDefinition)
    +  }
    +
    +  def getDatabase(db: String): CatalogDatabase = {
    +    externalCatalog.getDatabase(db)
    +  }
    +
    +  def databaseExists(db: String): Boolean = {
    +    externalCatalog.databaseExists(db)
    +  }
    +
    +  def listDatabases(): Seq[String] = {
    +    externalCatalog.listDatabases()
    +  }
    +
    +  def listDatabases(pattern: String): Seq[String] = {
    +    externalCatalog.listDatabases(pattern)
    +  }
    +
    +  // ----------------------------------------------------------------------------
    +  // Tables
    +  // ----------------------------------------------------------------------------
    +  // There are two kinds of tables, temporary tables and metastore tables.
    +  // Temporary tables are isolated across sessions and do not belong to any
    +  // particular database. Metastore tables can be used across multiple
    +  // sessions as their metadata is persisted in the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  // ----------------------------------------------------
    +  // | Methods that interact with metastore tables only |
    +  // ----------------------------------------------------
    +
    +  /**
    +   * Create a metastore table in the database specified in `tableDefinition`.
    +   * If no such database is specified, create it in the current database.
    +   */
    +  def createTable(
    +      currentDb: String,
    --- End diff --
    
    make sure we document that in the session catalog code and explain why we are tracking current db in here.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197131127
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53263/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56436908
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    +  private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
    +
    +  // Note: we track current database here because certain operations do not explicitly
    +  // specify the database (e.g. DROP TABLE my_table). In these cases we must first
    +  // check whether the temporary table or function exists, then, if not, operate on
    +  // the corresponding item in the current database.
    +  private[this] var currentDb = "default"
    +
    +  // ----------------------------------------------------------------------------
    +  // Databases
    +  // ----------------------------------------------------------------------------
    +  // All methods in this category interact directly with the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
    +    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
    +  }
    +
    +  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
    +    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
    +  }
    +
    +  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
    +    externalCatalog.alterDatabase(dbDefinition)
    +  }
    +
    +  def getDatabase(db: String): CatalogDatabase = {
    +    externalCatalog.getDatabase(db)
    +  }
    +
    +  def databaseExists(db: String): Boolean = {
    +    externalCatalog.databaseExists(db)
    +  }
    +
    +  def listDatabases(): Seq[String] = {
    +    externalCatalog.listDatabases()
    +  }
    +
    +  def listDatabases(pattern: String): Seq[String] = {
    +    externalCatalog.listDatabases(pattern)
    +  }
    +
    +  def getCurrentDatabase: String = currentDb
    +
    +  def setCurrentDatabase(db: String): Unit = {
    +    if (!databaseExists(db)) {
    +      throw new AnalysisException(s"cannot set current database to non-existent '$db'")
    +    }
    +    currentDb = db
    +  }
    +
    +  // ----------------------------------------------------------------------------
    +  // Tables
    +  // ----------------------------------------------------------------------------
    +  // There are two kinds of tables, temporary tables and metastore tables.
    +  // Temporary tables are isolated across sessions and do not belong to any
    +  // particular database. Metastore tables can be used across multiple
    +  // sessions as their metadata is persisted in the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  // ----------------------------------------------------
    +  // | Methods that interact with metastore tables only |
    +  // ----------------------------------------------------
    +
    +  /**
    +   * Create a metastore table in the database specified in `tableDefinition`.
    +   * If no such database is specified, create it in the current database.
    +   */
    +  def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
    +  }
    +
    +  /**
    +   * Alter the metadata of an existing metastore table identified by `tableDefinition`.
    +   *
    +   * If no database is specified in `tableDefinition`, assume the table is in the
    +   * current database.
    +   *
    +   * Note: If the underlying implementation does not support altering a certain field,
    +   * this becomes a no-op.
    +   */
    +  def alterTable(tableDefinition: CatalogTable): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.alterTable(db, newTableDefinition)
    +  }
    +
    +  /**
    +   * Retrieve the metadata of an existing metastore table.
    +   * If no database is specified, assume the table is in the current database.
    +   */
    +  def getTable(name: TableIdentifier): CatalogTable = {
    +    val db = name.database.getOrElse(currentDb)
    +    externalCatalog.getTable(db, name.table)
    +  }
    +
    +  // -------------------------------------------------------------
    +  // | Methods that interact with temporary and metastore tables |
    +  // -------------------------------------------------------------
    +
    +  /**
    +   * Create a temporary table.
    +   */
    +  def createTempTable(
    +      name: String,
    +      tableDefinition: LogicalPlan,
    +      ignoreIfExists: Boolean): Unit = {
    +    if (tempTables.containsKey(name) && !ignoreIfExists) {
    +      throw new AnalysisException(s"Temporary table '$name' already exists.")
    +    }
    +    tempTables.put(name, tableDefinition)
    +  }
    +
    +  /**
    +   * Rename a table.
    +   *
    +   * If a database is specified in `oldName`, this will rename the table in that database.
    +   * If no database is specified, this will first attempt to rename a temporary table with
    +   * the same name, then, if that does not exist, rename the table in the current database.
    +   *
    +   * This assumes the database specified in `oldName` matches the one specified in `newName`.
    +   */
    +  def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = {
    +    if (oldName.database != newName.database) {
    +      throw new AnalysisException("rename does not support moving tables across databases")
    +    }
    +    val db = oldName.database.getOrElse(currentDb)
    +    if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) {
    +      externalCatalog.renameTable(db, oldName.table, newName.table)
    +    } else {
    +      val table = tempTables.remove(oldName.table)
    +      tempTables.put(newName.table, table)
    +    }
    +  }
    +
    +  /**
    +   * Drop a table.
    +   *
    +   * If a database is specified in `name`, this will drop the table from that database.
    +   * If no database is specified, this will first attempt to drop a temporary table with
    +   * the same name, then, if that does not exist, drop the table from the current database.
    +   */
    +  def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
    +    val db = name.database.getOrElse(currentDb)
    +    if (name.database.isDefined || !tempTables.containsKey(name.table)) {
    +      externalCatalog.dropTable(db, name.table, ignoreIfNotExists)
    +    } else {
    +      tempTables.remove(name.table)
    +    }
    +  }
    +
    +  /**
    +   * Return a [[LogicalPlan]] that represents the given table.
    +   *
    +   * If a database is specified in `name`, this will return the table from that database.
    +   * If no database is specified, this will first attempt to return a temporary table with
    +   * the same name, then, if that does not exist, return the table from the current database.
    +   */
    +  def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
    +    val db = name.database.getOrElse(currentDb)
    +    val relation =
    +      if (name.database.isDefined || !tempTables.containsKey(name.table)) {
    +        val metadata = externalCatalog.getTable(db, name.table)
    +        CatalogRelation(db, metadata, alias)
    +      } else {
    +        tempTables.get(name.table)
    +      }
    +    val tableWithQualifiers = SubqueryAlias(name.table, relation)
    +    // If an alias was specified by the lookup, wrap the plan in a subquery so that
    +    // attributes are properly qualified with this alias.
    +    alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
    +  }
    +
    +  /**
    +   * List all tables in the specified database, including temporary tables.
    +   */
    +  def listTables(db: String): Seq[TableIdentifier] = {
    +    val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) }
    +    val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) }
    +    dbTables ++ _tempTables
    +  }
    +
    +  /**
    +   * List all matching tables in the specified database, including temporary tables.
    +   */
    +  def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
    +    val dbTables =
    +      externalCatalog.listTables(db, pattern).map { t => TableIdentifier(t, Some(db)) }
    +    val regex = pattern.replaceAll("\\*", ".*").r
    +    val _tempTables = tempTables.keys().asScala
    +      .filter { t => regex.pattern.matcher(t).matches() }
    +      .map { t => TableIdentifier(t) }
    +    dbTables ++ _tempTables
    +  }
    +
    +  /**
    +   * Return a temporary table exactly as it was stored.
    +   * For testing only.
    +   */
    +  private[catalog] def getTempTable(name: String): Option[LogicalPlan] = {
    +    Option(tempTables.get(name))
    +  }
    +
    +  // ----------------------------------------------------------------------------
    +  // Partitions
    +  // ----------------------------------------------------------------------------
    +  // All methods in this category interact directly with the underlying catalog.
    +  // These methods are concerned with only metastore tables.
    +  // ----------------------------------------------------------------------------
    +
    +  // TODO: We need to figure out how these methods interact with our data source
    +  // tables. For such tables, we do not store values of partitioning columns in
    +  // the metastore. For now, partition values of a data source table will be
    +  // automatically discovered when we load the table.
    +
    +  /**
    +   * Create partitions in an existing table, assuming it exists.
    +   * If no database is specified, assume the table is in the current database.
    +   */
    +  def createPartitions(
    +      tableName: TableIdentifier,
    +      parts: Seq[CatalogTablePartition],
    +      ignoreIfExists: Boolean): Unit = {
    +    val db = tableName.database.getOrElse(currentDb)
    +    externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists)
    +  }
    +
    +  /**
    +   * Drop partitions from a table, assuming they exist.
    +   * If no database is specified, assume the table is in the current database.
    +   */
    +  def dropPartitions(
    +      tableName: TableIdentifier,
    +      parts: Seq[TablePartitionSpec],
    +      ignoreIfNotExists: Boolean): Unit = {
    +    val db = tableName.database.getOrElse(currentDb)
    +    externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists)
    +  }
    +
    +  /**
    +   * Override the specs of one or many existing table partitions, assuming they exist.
    +   *
    +   * This assumes index i of `specs` corresponds to index i of `newSpecs`.
    +   * If no database is specified, assume the table is in the current database.
    +   */
    +  def renamePartitions(
    +      tableName: TableIdentifier,
    +      specs: Seq[TablePartitionSpec],
    +      newSpecs: Seq[TablePartitionSpec]): Unit = {
    +    val db = tableName.database.getOrElse(currentDb)
    +    externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs)
    +  }
    +
    +  /**
    +   * Alter one or many table partitions whose specs that match those specified in `parts`,
    +   * assuming the partitions exist.
    +   *
    +   * If no database is specified, assume the table is in the current database.
    --- End diff --
    
    I don't think replicating this comment is that useful. This is just the semantics of current database.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197596742
  
    **[Test build #2648 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2648/consoleFull)** for PR 11750 at commit [`ad43a5f`](https://github.com/apache/spark/commit/ad43a5ffdeeb881aaed8944971b63a27d1f4257f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197620171
  
    **[Test build #53358 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53358/consoleFull)** for PR 11750 at commit [`ad43a5f`](https://github.com/apache/spark/commit/ad43a5ffdeeb881aaed8944971b63a27d1f4257f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197131125
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56393684
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    +
    +  private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
    +
    +  // ----------------------------------------------------------------------------
    +  // Databases
    +  // ----------------------------------------------------------------------------
    +  // All methods in this category interact directly with the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
    +    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
    +  }
    +
    +  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
    +    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
    +  }
    +
    +  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
    +    externalCatalog.alterDatabase(dbDefinition)
    +  }
    +
    +  def getDatabase(db: String): CatalogDatabase = {
    +    externalCatalog.getDatabase(db)
    +  }
    +
    +  def databaseExists(db: String): Boolean = {
    +    externalCatalog.databaseExists(db)
    +  }
    +
    +  def listDatabases(): Seq[String] = {
    +    externalCatalog.listDatabases()
    +  }
    +
    +  def listDatabases(pattern: String): Seq[String] = {
    +    externalCatalog.listDatabases(pattern)
    +  }
    +
    +  // ----------------------------------------------------------------------------
    +  // Tables
    +  // ----------------------------------------------------------------------------
    +  // There are two kinds of tables, temporary tables and metastore tables.
    +  // Temporary tables are isolated across sessions and do not belong to any
    +  // particular database. Metastore tables can be used across multiple
    +  // sessions as their metadata is persisted in the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  // ----------------------------------------------------
    +  // | Methods that interact with metastore tables only |
    +  // ----------------------------------------------------
    +
    +  /**
    +   * Create a metastore table in the database specified in `tableDefinition`.
    +   * If no such database is specified, create it in the current database.
    +   */
    +  def createTable(
    +      currentDb: String,
    --- End diff --
    
    as discussed offline, this is because we need to deal with temporary tables. An alternative that I will implement here is to just keep track of the current database in this class so we don't need to pass it in everywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56276611
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    +
    +  private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
    +
    +  // ----------------------------------------------------------------------------
    +  // Databases
    +  // ----------------------------------------------------------------------------
    +  // All methods in this category interact directly with the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
    +    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
    +  }
    +
    +  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
    +    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
    +  }
    +
    +  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
    +    externalCatalog.alterDatabase(dbDefinition)
    +  }
    +
    +  def getDatabase(db: String): CatalogDatabase = {
    +    externalCatalog.getDatabase(db)
    +  }
    +
    +  def databaseExists(db: String): Boolean = {
    +    externalCatalog.databaseExists(db)
    +  }
    +
    +  def listDatabases(): Seq[String] = {
    +    externalCatalog.listDatabases()
    +  }
    +
    +  def listDatabases(pattern: String): Seq[String] = {
    +    externalCatalog.listDatabases(pattern)
    +  }
    +
    +  // ----------------------------------------------------------------------------
    +  // Tables
    +  // ----------------------------------------------------------------------------
    +  // There are two kinds of tables, temporary tables and metastore tables.
    +  // Temporary tables are isolated across sessions and do not belong to any
    +  // particular database. Metastore tables can be used across multiple
    +  // sessions as their metadata is persisted in the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  // ----------------------------------------------------
    +  // | Methods that interact with metastore tables only |
    +  // ----------------------------------------------------
    +
    +  /**
    +   * Create a metastore table in the database specified in `tableDefinition`.
    +   * If no such database is specified, create it in the current database.
    +   */
    +  def createTable(
    +      currentDb: String,
    --- End diff --
    
    it is somewhat strange to pass in currentDb and then rely on some table definition's database. Have you thought about just figuring out that part in the caller outside the session catalog? i.e. the catalog itself doesn't need to handle currentDb.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197586553
  
    If there are no major problems, let's merge this as soon as tests pass. We can address minor comments as followups. This will unblock a bunch of other stuff.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56425133
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---
    @@ -211,8 +214,7 @@ case class CatalogTablePartition(
      * future once we have a better understanding of how we want to handle skewed columns.
      */
     case class CatalogTable(
    -    specifiedDatabase: Option[String],
    -    name: String,
    +    name: TableIdentifier,
    --- End diff --
    
    yeah, we should rename it. I think even something like `identifier` is better. Let's do that later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56437475
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    --- End diff --
    
    I copied this from the existing catalog. Actually, I do not think multiple threads will use the same SessionCatalog. We can change it later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197625780
  
    **[Test build #2648 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2648/consoleFull)** for PR 11750 at commit [`ad43a5f`](https://github.com/apache/spark/commit/ad43a5ffdeeb881aaed8944971b63a27d1f4257f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197620377
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53358/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197586729
  
    Yea, let's merge this once it passes tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56441976
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    --- End diff --
    
    good catch. I just copied this from Yin's code. We can just make this a normal hash map then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197131108
  
    **[Test build #53263 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53263/consoleFull)** for PR 11750 at commit [`3b2e48a`](https://github.com/apache/spark/commit/3b2e48a4efa81bb886331ce5818e2d44c7c2f7da).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197620375
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197625591
  
    **[Test build #2647 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2647/consoleFull)** for PR 11750 at commit [`ad43a5f`](https://github.com/apache/spark/commit/ad43a5ffdeeb881aaed8944971b63a27d1f4257f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197127331
  
    @yhuai @rxin


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197627234
  
    OK. Let me merge this. Let's address comments in the follow-up pr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56422492
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---
    @@ -211,8 +214,7 @@ case class CatalogTablePartition(
      * future once we have a better understanding of how we want to handle skewed columns.
      */
     case class CatalogTable(
    -    specifiedDatabase: Option[String],
    -    name: String,
    +    name: TableIdentifier,
    --- End diff --
    
    Maybe we can rename this `name` later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/11750


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197127120
  
    **[Test build #53263 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53263/consoleFull)** for PR 11750 at commit [`3b2e48a`](https://github.com/apache/spark/commit/3b2e48a4efa81bb886331ce5818e2d44c7c2f7da).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197558997
  
    **[Test build #53358 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53358/consoleFull)** for PR 11750 at commit [`ad43a5f`](https://github.com/apache/spark/commit/ad43a5ffdeeb881aaed8944971b63a27d1f4257f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11750#issuecomment-197596708
  
    **[Test build #2647 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2647/consoleFull)** for PR 11750 at commit [`ad43a5f`](https://github.com/apache/spark/commit/ad43a5ffdeeb881aaed8944971b63a27d1f4257f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56424506
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    +  private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
    +
    +  // Note: we track current database here because certain operations do not explicitly
    +  // specify the database (e.g. DROP TABLE my_table). In these cases we must first
    +  // check whether the temporary table or function exists, then, if not, operate on
    +  // the corresponding item in the current database.
    +  private[this] var currentDb = "default"
    +
    +  // ----------------------------------------------------------------------------
    +  // Databases
    +  // ----------------------------------------------------------------------------
    +  // All methods in this category interact directly with the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
    +    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
    +  }
    +
    +  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
    +    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
    +  }
    +
    +  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
    +    externalCatalog.alterDatabase(dbDefinition)
    +  }
    +
    +  def getDatabase(db: String): CatalogDatabase = {
    +    externalCatalog.getDatabase(db)
    +  }
    +
    +  def databaseExists(db: String): Boolean = {
    +    externalCatalog.databaseExists(db)
    +  }
    +
    +  def listDatabases(): Seq[String] = {
    +    externalCatalog.listDatabases()
    +  }
    +
    +  def listDatabases(pattern: String): Seq[String] = {
    +    externalCatalog.listDatabases(pattern)
    +  }
    +
    +  def getCurrentDatabase: String = currentDb
    +
    +  def setCurrentDatabase(db: String): Unit = {
    +    if (!databaseExists(db)) {
    +      throw new AnalysisException(s"cannot set current database to non-existent '$db'")
    +    }
    +    currentDb = db
    +  }
    +
    +  // ----------------------------------------------------------------------------
    +  // Tables
    +  // ----------------------------------------------------------------------------
    +  // There are two kinds of tables, temporary tables and metastore tables.
    +  // Temporary tables are isolated across sessions and do not belong to any
    +  // particular database. Metastore tables can be used across multiple
    +  // sessions as their metadata is persisted in the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  // ----------------------------------------------------
    +  // | Methods that interact with metastore tables only |
    +  // ----------------------------------------------------
    +
    +  /**
    +   * Create a metastore table in the database specified in `tableDefinition`.
    +   * If no such database is specified, create it in the current database.
    +   */
    +  def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
    +  }
    +
    +  /**
    +   * Alter the metadata of an existing metastore table identified by `tableDefinition`.
    +   *
    +   * If no database is specified in `tableDefinition`, assume the table is in the
    +   * current database.
    +   *
    +   * Note: If the underlying implementation does not support altering a certain field,
    +   * this becomes a no-op.
    +   */
    +  def alterTable(tableDefinition: CatalogTable): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.alterTable(db, newTableDefinition)
    +  }
    +
    +  /**
    +   * Retrieve the metadata of an existing metastore table.
    +   * If no database is specified, assume the table is in the current database.
    +   */
    +  def getTable(name: TableIdentifier): CatalogTable = {
    +    val db = name.database.getOrElse(currentDb)
    +    externalCatalog.getTable(db, name.table)
    +  }
    +
    +  // -------------------------------------------------------------
    +  // | Methods that interact with temporary and metastore tables |
    +  // -------------------------------------------------------------
    +
    +  /**
    +   * Create a temporary table.
    +   */
    +  def createTempTable(
    +      name: String,
    +      tableDefinition: LogicalPlan,
    +      ignoreIfExists: Boolean): Unit = {
    +    if (tempTables.containsKey(name) && !ignoreIfExists) {
    +      throw new AnalysisException(s"Temporary table '$name' already exists.")
    +    }
    +    tempTables.put(name, tableDefinition)
    +  }
    +
    +  /**
    +   * Rename a table.
    +   *
    +   * If a database is specified in `oldName`, this will rename the table in that database.
    +   * If no database is specified, this will first attempt to rename a temporary table with
    +   * the same name, then, if that does not exist, rename the table in the current database.
    +   *
    +   * This assumes the database specified in `oldName` matches the one specified in `newName`.
    +   */
    +  def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = {
    +    if (oldName.database != newName.database) {
    +      throw new AnalysisException("rename does not support moving tables across databases")
    +    }
    +    val db = oldName.database.getOrElse(currentDb)
    +    if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) {
    +      externalCatalog.renameTable(db, oldName.table, newName.table)
    +    } else {
    +      val table = tempTables.remove(oldName.table)
    +      tempTables.put(newName.table, table)
    +    }
    +  }
    +
    +  /**
    +   * Drop a table.
    +   *
    +   * If a database is specified in `name`, this will drop the table from that database.
    +   * If no database is specified, this will first attempt to drop a temporary table with
    +   * the same name, then, if that does not exist, drop the table from the current database.
    +   */
    +  def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
    --- End diff --
    
    This is the semantic of postgres and Hive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56442105
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    +  private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
    +
    +  // Note: we track current database here because certain operations do not explicitly
    +  // specify the database (e.g. DROP TABLE my_table). In these cases we must first
    +  // check whether the temporary table or function exists, then, if not, operate on
    +  // the corresponding item in the current database.
    +  private[this] var currentDb = "default"
    +
    +  // ----------------------------------------------------------------------------
    +  // Databases
    +  // ----------------------------------------------------------------------------
    +  // All methods in this category interact directly with the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
    +    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
    +  }
    +
    +  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
    +    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
    +  }
    +
    +  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
    +    externalCatalog.alterDatabase(dbDefinition)
    +  }
    +
    +  def getDatabase(db: String): CatalogDatabase = {
    +    externalCatalog.getDatabase(db)
    +  }
    +
    +  def databaseExists(db: String): Boolean = {
    +    externalCatalog.databaseExists(db)
    +  }
    +
    +  def listDatabases(): Seq[String] = {
    +    externalCatalog.listDatabases()
    +  }
    +
    +  def listDatabases(pattern: String): Seq[String] = {
    +    externalCatalog.listDatabases(pattern)
    +  }
    +
    +  def getCurrentDatabase: String = currentDb
    +
    +  def setCurrentDatabase(db: String): Unit = {
    +    if (!databaseExists(db)) {
    +      throw new AnalysisException(s"cannot set current database to non-existent '$db'")
    +    }
    +    currentDb = db
    +  }
    +
    +  // ----------------------------------------------------------------------------
    +  // Tables
    +  // ----------------------------------------------------------------------------
    +  // There are two kinds of tables, temporary tables and metastore tables.
    +  // Temporary tables are isolated across sessions and do not belong to any
    +  // particular database. Metastore tables can be used across multiple
    +  // sessions as their metadata is persisted in the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  // ----------------------------------------------------
    +  // | Methods that interact with metastore tables only |
    +  // ----------------------------------------------------
    +
    +  /**
    +   * Create a metastore table in the database specified in `tableDefinition`.
    +   * If no such database is specified, create it in the current database.
    +   */
    +  def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
    +  }
    +
    +  /**
    +   * Alter the metadata of an existing metastore table identified by `tableDefinition`.
    +   *
    +   * If no database is specified in `tableDefinition`, assume the table is in the
    +   * current database.
    +   *
    +   * Note: If the underlying implementation does not support altering a certain field,
    +   * this becomes a no-op.
    +   */
    +  def alterTable(tableDefinition: CatalogTable): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.alterTable(db, newTableDefinition)
    +  }
    +
    +  /**
    +   * Retrieve the metadata of an existing metastore table.
    +   * If no database is specified, assume the table is in the current database.
    +   */
    +  def getTable(name: TableIdentifier): CatalogTable = {
    +    val db = name.database.getOrElse(currentDb)
    +    externalCatalog.getTable(db, name.table)
    +  }
    +
    +  // -------------------------------------------------------------
    +  // | Methods that interact with temporary and metastore tables |
    +  // -------------------------------------------------------------
    +
    +  /**
    +   * Create a temporary table.
    +   */
    +  def createTempTable(
    +      name: String,
    +      tableDefinition: LogicalPlan,
    +      ignoreIfExists: Boolean): Unit = {
    +    if (tempTables.containsKey(name) && !ignoreIfExists) {
    +      throw new AnalysisException(s"Temporary table '$name' already exists.")
    +    }
    +    tempTables.put(name, tableDefinition)
    --- End diff --
    
    `ignoreIfExists` is like "ignoring the exception if the table already exists", but I guess it doesn't convey whether the table itself is overridden. I could rename this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56437018
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    --- End diff --
    
    Why concurrent? Comment on expected thread safety of this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13923] [SQL] Implement SessionCatalog

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56442370
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    +  private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
    +
    +  // Note: we track current database here because certain operations do not explicitly
    +  // specify the database (e.g. DROP TABLE my_table). In these cases we must first
    +  // check whether the temporary table or function exists, then, if not, operate on
    +  // the corresponding item in the current database.
    +  private[this] var currentDb = "default"
    +
    +  // ----------------------------------------------------------------------------
    +  // Databases
    +  // ----------------------------------------------------------------------------
    +  // All methods in this category interact directly with the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
    +    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
    +  }
    +
    +  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
    +    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
    +  }
    +
    +  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
    +    externalCatalog.alterDatabase(dbDefinition)
    +  }
    +
    +  def getDatabase(db: String): CatalogDatabase = {
    +    externalCatalog.getDatabase(db)
    +  }
    +
    +  def databaseExists(db: String): Boolean = {
    +    externalCatalog.databaseExists(db)
    +  }
    +
    +  def listDatabases(): Seq[String] = {
    +    externalCatalog.listDatabases()
    +  }
    +
    +  def listDatabases(pattern: String): Seq[String] = {
    +    externalCatalog.listDatabases(pattern)
    +  }
    +
    +  def getCurrentDatabase: String = currentDb
    +
    +  def setCurrentDatabase(db: String): Unit = {
    +    if (!databaseExists(db)) {
    +      throw new AnalysisException(s"cannot set current database to non-existent '$db'")
    +    }
    +    currentDb = db
    +  }
    +
    +  // ----------------------------------------------------------------------------
    +  // Tables
    +  // ----------------------------------------------------------------------------
    +  // There are two kinds of tables, temporary tables and metastore tables.
    +  // Temporary tables are isolated across sessions and do not belong to any
    +  // particular database. Metastore tables can be used across multiple
    +  // sessions as their metadata is persisted in the underlying catalog.
    +  // ----------------------------------------------------------------------------
    +
    +  // ----------------------------------------------------
    +  // | Methods that interact with metastore tables only |
    +  // ----------------------------------------------------
    +
    +  /**
    +   * Create a metastore table in the database specified in `tableDefinition`.
    +   * If no such database is specified, create it in the current database.
    +   */
    +  def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
    +  }
    +
    +  /**
    +   * Alter the metadata of an existing metastore table identified by `tableDefinition`.
    +   *
    +   * If no database is specified in `tableDefinition`, assume the table is in the
    +   * current database.
    +   *
    +   * Note: If the underlying implementation does not support altering a certain field,
    +   * this becomes a no-op.
    +   */
    +  def alterTable(tableDefinition: CatalogTable): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.alterTable(db, newTableDefinition)
    +  }
    +
    +  /**
    +   * Retrieve the metadata of an existing metastore table.
    +   * If no database is specified, assume the table is in the current database.
    +   */
    +  def getTable(name: TableIdentifier): CatalogTable = {
    +    val db = name.database.getOrElse(currentDb)
    +    externalCatalog.getTable(db, name.table)
    +  }
    +
    +  // -------------------------------------------------------------
    +  // | Methods that interact with temporary and metastore tables |
    +  // -------------------------------------------------------------
    +
    +  /**
    +   * Create a temporary table.
    +   */
    +  def createTempTable(
    +      name: String,
    +      tableDefinition: LogicalPlan,
    +      ignoreIfExists: Boolean): Unit = {
    +    if (tempTables.containsKey(name) && !ignoreIfExists) {
    +      throw new AnalysisException(s"Temporary table '$name' already exists.")
    +    }
    +    tempTables.put(name, tableDefinition)
    --- End diff --
    
    Let's make this name more explicit. I think in other places, `ignoreIfExists` means that if a table exists, we do nothing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org