You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/10/10 07:49:16 UTC

[1/2] spark git commit: [SPARK-17338][SQL] add global temp view

Repository: spark
Updated Branches:
  refs/heads/master 16590030c -> 23ddff4b2


http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 1bcb810..1988515 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -969,17 +969,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         """.stripMargin)
       checkAnswer(
         sql("SHOW TABLES IN default 'show1*'"),
-        Row("show1a", true) :: Nil)
+        Row("", "show1a", true) :: Nil)
 
       checkAnswer(
         sql("SHOW TABLES IN default 'show1*|show2*'"),
-        Row("show1a", true) ::
-          Row("show2b", true) :: Nil)
+        Row("", "show1a", true) ::
+          Row("", "show2b", true) :: Nil)
 
       checkAnswer(
         sql("SHOW TABLES 'show1*|show2*'"),
-        Row("show1a", true) ::
-          Row("show2b", true) :: Nil)
+        Row("", "show1a", true) ::
+          Row("", "show2b", true) :: Nil)
 
       assert(
         sql("SHOW TABLES").count() >= 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 85c5098..85ecf0c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -41,6 +41,7 @@ import org.apache.spark.util.Utils
 
 private[sql] class HiveSessionCatalog(
     externalCatalog: HiveExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
     sparkSession: SparkSession,
     functionResourceLoader: FunctionResourceLoader,
     functionRegistry: FunctionRegistry,
@@ -48,6 +49,7 @@ private[sql] class HiveSessionCatalog(
     hadoopConf: Configuration)
   extends SessionCatalog(
     externalCatalog,
+    globalTempViewManager,
     functionResourceLoader,
     functionRegistry,
     conf,

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index eb10c113..6d4fe1a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -45,6 +45,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
   override lazy val catalog = {
     new HiveSessionCatalog(
       sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+      sparkSession.sharedState.globalTempViewManager,
       sparkSession,
       functionResourceLoader,
       functionRegistry,

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
index 57363b7..939fd71 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
@@ -87,11 +87,11 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac
     assert(
       hc.sql("SELECT * FROM moo_table order by name").collect().toSeq ==
       df.collect().toSeq.sortBy(_.getString(0)))
-    val tables = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0))
+    val tables = hc.sql("SHOW TABLES IN mee_db").select("tableName").collect().map(_.getString(0))
     assert(tables.toSet == Set("moo_table", "mee_table"))
     hc.sql("DROP TABLE moo_table")
     hc.sql("DROP TABLE mee_table")
-    val tables2 = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0))
+    val tables2 = hc.sql("SHOW TABLES IN mee_db").select("tableName").collect().map(_.getString(0))
     assert(tables2.isEmpty)
     hc.sql("USE default")
     hc.sql("DROP DATABASE mee_db CASCADE")

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 6eeb675..15ba616 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -58,10 +58,10 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
         // We are using default DB.
         checkAnswer(
           allTables.filter("tableName = 'listtablessuitetable'"),
-          Row("listtablessuitetable", true))
+          Row("", "listtablessuitetable", true))
         checkAnswer(
           allTables.filter("tableName = 'hivelisttablessuitetable'"),
-          Row("hivelisttablessuitetable", false))
+          Row("default", "hivelisttablessuitetable", false))
         assert(allTables.filter("tableName = 'hiveindblisttablessuitetable'").count() === 0)
     }
   }
@@ -71,11 +71,11 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
       case allTables =>
         checkAnswer(
           allTables.filter("tableName = 'listtablessuitetable'"),
-          Row("listtablessuitetable", true))
+          Row("", "listtablessuitetable", true))
         assert(allTables.filter("tableName = 'hivelisttablessuitetable'").count() === 0)
         checkAnswer(
           allTables.filter("tableName = 'hiveindblisttablessuitetable'"),
-          Row("hiveindblisttablessuitetable", false))
+          Row("listtablessuitedb", "hiveindblisttablessuitetable", false))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 8ae6868..5167064 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -984,7 +984,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
 
     checkAnswer(
       spark.sql("show TABLES in testdb8156").filter("tableName = 'ttt3'"),
-      Row("ttt3", false))
+      Row("testdb8156", "ttt3", false))
     spark.sql("""use default""")
     spark.sql("""drop database if exists testdb8156 CASCADE""")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index b2103b3..2c772ce 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -94,15 +94,15 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
       sql("CREATE TABLE show2b(c2 int)")
       checkAnswer(
         sql("SHOW TABLES IN default 'show1*'"),
-        Row("show1a", false) :: Nil)
+        Row("default", "show1a", false) :: Nil)
       checkAnswer(
         sql("SHOW TABLES IN default 'show1*|show2*'"),
-        Row("show1a", false) ::
-          Row("show2b", false) :: Nil)
+        Row("default", "show1a", false) ::
+          Row("default", "show2b", false) :: Nil)
       checkAnswer(
         sql("SHOW TABLES 'show1*|show2*'"),
-        Row("show1a", false) ::
-          Row("show2b", false) :: Nil)
+        Row("default", "show1a", false) ::
+          Row("default", "show2b", false) :: Nil)
       assert(
         sql("SHOW TABLES").count() >= 2)
       assert(

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index f5c605f..2af935d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -62,15 +62,15 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       var e = intercept[AnalysisException] {
         sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt")
       }.getMessage
-      assert(e.contains("`default`.`tab1` is not a view"))
+      assert(e.contains("`tab1` is not a view"))
       e = intercept[AnalysisException] {
         sql("CREATE VIEW tab1 AS SELECT * FROM jt")
       }.getMessage
-      assert(e.contains("`default`.`tab1` is not a view"))
+      assert(e.contains("`tab1` is not a view"))
       e = intercept[AnalysisException] {
         sql("ALTER VIEW tab1 AS SELECT * FROM jt")
       }.getMessage
-      assert(e.contains("`default`.`tab1` is not a view"))
+      assert(e.contains("`tab1` is not a view"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-17338][SQL] add global temp view

Posted by we...@apache.org.
[SPARK-17338][SQL] add global temp view

## What changes were proposed in this pull request?

Global temporary view is a cross-session temporary view, which means it's shared among all sessions. Its lifetime is the lifetime of the Spark application, i.e. it will be automatically dropped when the application terminates. It's tied to a system preserved database `global_temp`(configurable via SparkConf), and we must use the qualified name to refer a global temp view, e.g. SELECT * FROM global_temp.view1.

changes for `SessionCatalog`:

1. add a new field `gloabalTempViews: GlobalTempViewManager`, to access the shared global temp views, and the global temp db name.
2. `createDatabase` will fail if users wanna create `global_temp`, which is system preserved.
3. `setCurrentDatabase` will fail if users wanna set `global_temp`, which is system preserved.
4. add `createGlobalTempView`, which is used in `CreateViewCommand` to create global temp views.
5. add `dropGlobalTempView`, which is used in `CatalogImpl` to drop global temp view.
6. add `alterTempViewDefinition`, which is used in `AlterViewAsCommand` to update the view definition for local/global temp views.
7. `renameTable`/`dropTable`/`isTemporaryTable`/`lookupRelation`/`getTempViewOrPermanentTableMetadata`/`refreshTable` will handle global temp views.

changes for SQL commands:

1. `CreateViewCommand`/`AlterViewAsCommand` is updated to support global temp views
2. `ShowTablesCommand` outputs a new column `database`, which is used to distinguish global and local temp views.
3. other commands can also handle global temp views if they call `SessionCatalog` APIs which accepts global temp views, e.g. `DropTableCommand`, `AlterTableRenameCommand`, `ShowColumnsCommand`, etc.

changes for other public API

1. add a new method `dropGlobalTempView` in `Catalog`
2. `Catalog.findTable` can find global temp view
3. add a new method `createGlobalTempView` in `Dataset`

## How was this patch tested?

new tests in `SQLViewSuite`

Author: Wenchen Fan <we...@databricks.com>

Closes #14897 from cloud-fan/global-temp-view.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23ddff4b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23ddff4b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23ddff4b

Branch: refs/heads/master
Commit: 23ddff4b2b2744c3dc84d928e144c541ad5df376
Parents: 1659003
Author: Wenchen Fan <we...@databricks.com>
Authored: Mon Oct 10 15:48:57 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Oct 10 15:48:57 2016 +0800

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  |   7 +
 docs/sql-programming-guide.md                   |  45 ++++-
 .../spark/examples/sql/JavaSparkSQLExample.java |  30 ++-
 examples/src/main/python/sql/basic.py           |  25 +++
 .../spark/examples/sql/SparkSQLExample.scala    |  25 +++
 project/MimaExcludes.scala                      |   4 +-
 python/pyspark/sql/catalog.py                   |  18 +-
 python/pyspark/sql/context.py                   |   2 +-
 python/pyspark/sql/dataframe.py                 |  25 ++-
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   8 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  10 +-
 .../catalog/GlobalTempViewManager.scala         | 121 ++++++++++++
 .../sql/catalyst/catalog/SessionCatalog.scala   | 189 +++++++++++++++----
 .../scala/org/apache/spark/sql/Dataset.scala    |  48 ++++-
 .../org/apache/spark/sql/catalog/Catalog.scala  |  20 +-
 .../spark/sql/execution/QueryExecution.scala    |   8 +-
 .../spark/sql/execution/SparkSqlParser.scala    |  19 +-
 .../spark/sql/execution/command/ddl.scala       |  25 +--
 .../spark/sql/execution/command/tables.scala    |  11 +-
 .../spark/sql/execution/command/views.scala     | 150 ++++++++-------
 .../spark/sql/execution/datasources/ddl.scala   |  20 +-
 .../apache/spark/sql/internal/CatalogImpl.scala |  26 ++-
 .../spark/sql/internal/SessionState.scala       |   1 +
 .../apache/spark/sql/internal/SharedState.scala |  75 +++++---
 .../org/apache/spark/sql/SQLContextSuite.scala  |  11 +-
 .../sql/execution/GlobalTempViewSuite.scala     | 168 +++++++++++++++++
 .../spark/sql/execution/command/DDLSuite.scala  |  10 +-
 .../spark/sql/hive/HiveSessionCatalog.scala     |   4 +-
 .../spark/sql/hive/HiveSessionState.scala       |   1 +
 .../hive/HiveContextCompatibilitySuite.scala    |   4 +-
 .../apache/spark/sql/hive/ListTablesSuite.scala |   8 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   2 +-
 .../sql/hive/execution/HiveCommandSuite.scala   |  10 +-
 .../spark/sql/hive/execution/SQLViewSuite.scala |   6 +-
 34 files changed, 906 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index d536cc5..0896e68 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -98,6 +98,13 @@ package object config {
     .checkValues(Set("hive", "in-memory"))
     .createWithDefault("in-memory")
 
+  // Note: This is a SQL config but needs to be in core because it's cross-session and can not put
+  // in SQLConf.
+  private[spark] val GLOBAL_TEMP_DATABASE = ConfigBuilder("spark.sql.globalTempDatabase")
+    .internal()
+    .stringConf
+    .createWithDefault("global_temp")
+
   private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
     ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
       .intConf

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 71bdd19..835cb69 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -220,6 +220,41 @@ The `sql` function enables applications to run SQL queries programmatically and
 </div>
 
 
+## Global Temporary View
+
+Temporay views in Spark SQL are session-scoped and will disappear if the session that creates it
+terminates. If you want to have a temporary view that is shared among all sessions and keep alive
+until the Spark application terminiates, you can create a global temporary view. Global temporary
+view is tied to a system preserved database `global_temp`, and we must use the qualified name to
+refer it, e.g. `SELECT * FROM global_temp.view1`.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+{% include_example global_temp_view scala/org/apache/spark/examples/sql/SparkSQLExample.scala %}
+</div>
+
+<div data-lang="java" markdown="1">
+{% include_example global_temp_view java/org/apache/spark/examples/sql/JavaSparkSQLExample.java %}
+</div>
+
+<div data-lang="python"  markdown="1">
+{% include_example global_temp_view python/sql/basic.py %}
+</div>
+
+<div data-lang="sql"  markdown="1">
+
+{% highlight sql %}
+
+CREATE GLOBAL TEMPORARY VIEW temp_view AS SELECT a + 1, b * 2 FROM tbl
+
+SELECT * FROM global_temp.temp_view
+
+{% endhighlight %}
+
+</div>
+</div>
+
+
 ## Creating Datasets
 
 Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use
@@ -1058,14 +1093,14 @@ the Data Sources API. The following options are supported:
       The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows).
     </td>
   </tr>
-  
+
   <tr>
     <td><code>truncate</code></td>
     <td>
-     This is a JDBC writer related option. When <code>SaveMode.Overwrite</code> is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g. indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to <code>false</code>. 
+     This is a JDBC writer related option. When <code>SaveMode.Overwrite</code> is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g. indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to <code>false</code>.
    </td>
   </tr>
-  
+
   <tr>
     <td><code>createTableOptions</code></td>
     <td>
@@ -1101,11 +1136,11 @@ USING org.apache.spark.sql.jdbc
 OPTIONS (
   url "jdbc:postgresql:dbserver",
   dbtable "schema.tablename",
-  user 'username', 
+  user 'username',
   password 'password'
 )
 
-INSERT INTO TABLE jdbcTable 
+INSERT INTO TABLE jdbcTable
 SELECT * FROM resultTable
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
index cff9032..c5770d1 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
@@ -54,6 +54,7 @@ import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 // $example off:programmatic_schema$
+import org.apache.spark.sql.AnalysisException;
 
 // $example on:untyped_ops$
 // col("...") is preferable to df.col("...")
@@ -84,7 +85,7 @@ public class JavaSparkSQLExample {
   }
   // $example off:create_ds$
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws AnalysisException {
     // $example on:init_session$
     SparkSession spark = SparkSession
       .builder()
@@ -101,7 +102,7 @@ public class JavaSparkSQLExample {
     spark.stop();
   }
 
-  private static void runBasicDataFrameExample(SparkSession spark) {
+  private static void runBasicDataFrameExample(SparkSession spark) throws AnalysisException {
     // $example on:create_df$
     Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
 
@@ -176,6 +177,31 @@ public class JavaSparkSQLExample {
     // |  19| Justin|
     // +----+-------+
     // $example off:run_sql$
+
+    // $example on:global_temp_view$
+    // Register the DataFrame as a global temporary view
+    df.createGlobalTempView("people");
+
+    // Global temporary view is tied to a system preserved database `global_temp`
+    spark.sql("SELECT * FROM global_temp.people").show();
+    // +----+-------+
+    // | age|   name|
+    // +----+-------+
+    // |null|Michael|
+    // |  30|   Andy|
+    // |  19| Justin|
+    // +----+-------+
+
+    // Global temporary view is cross-session
+    spark.newSession().sql("SELECT * FROM global_temp.people").show();
+    // +----+-------+
+    // | age|   name|
+    // +----+-------+
+    // |null|Michael|
+    // |  30|   Andy|
+    // |  19| Justin|
+    // +----+-------+
+    // $example off:global_temp_view$
   }
 
   private static void runDatasetCreationExample(SparkSession spark) {

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/examples/src/main/python/sql/basic.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql/basic.py b/examples/src/main/python/sql/basic.py
index fdc017a..ebcf669 100644
--- a/examples/src/main/python/sql/basic.py
+++ b/examples/src/main/python/sql/basic.py
@@ -114,6 +114,31 @@ def basic_df_example(spark):
     # +----+-------+
     # $example off:run_sql$
 
+    # $example on:global_temp_view$
+    # Register the DataFrame as a global temporary view
+    df.createGlobalTempView("people")
+
+    # Global temporary view is tied to a system preserved database `global_temp`
+    spark.sql("SELECT * FROM global_temp.people").show()
+    # +----+-------+
+    # | age|   name|
+    # +----+-------+
+    # |null|Michael|
+    # |  30|   Andy|
+    # |  19| Justin|
+    # +----+-------+
+
+    # Global temporary view is cross-session
+    spark.newSession().sql("SELECT * FROM global_temp.people").show()
+    # +----+-------+
+    # | age|   name|
+    # +----+-------+
+    # |null|Michael|
+    # |  30|   Andy|
+    # |  19| Justin|
+    # +----+-------+
+    # $example off:global_temp_view$
+
 
 def schema_inference_example(spark):
     # $example on:schema_inferring$

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
index 129b81d..f27c403 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
@@ -135,6 +135,31 @@ object SparkSQLExample {
     // |  19| Justin|
     // +----+-------+
     // $example off:run_sql$
+
+    // $example on:global_temp_view$
+    // Register the DataFrame as a global temporary view
+    df.createGlobalTempView("people")
+
+    // Global temporary view is tied to a system preserved database `global_temp`
+    spark.sql("SELECT * FROM global_temp.people").show()
+    // +----+-------+
+    // | age|   name|
+    // +----+-------+
+    // |null|Michael|
+    // |  30|   Andy|
+    // |  19| Justin|
+    // +----+-------+
+
+    // Global temporary view is cross-session
+    spark.newSession().sql("SELECT * FROM global_temp.people").show()
+    // +----+-------+
+    // | age|   name|
+    // +----+-------+
+    // |null|Michael|
+    // |  30|   Andy|
+    // |  19| Justin|
+    // +----+-------+
+    // $example off:global_temp_view$
   }
 
   private def runDatasetCreationExample(spark: SparkSession): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 163e3f2..e3d9a17 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -55,7 +55,9 @@ object MimaExcludes {
       ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.getFunction"),
       ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"),
       ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"),
-      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists")
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"),
+      // [SPARK-17338][SQL] add global temp view
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView")
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/python/pyspark/sql/catalog.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 3c50307..df3bf42 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -167,7 +167,7 @@ class Catalog(object):
 
     @since(2.0)
     def dropTempView(self, viewName):
-        """Drops the temporary view with the given view name in the catalog.
+        """Drops the local temporary view with the given view name in the catalog.
         If the view has been cached before, then it will also be uncached.
 
         >>> spark.createDataFrame([(1, 1)]).createTempView("my_table")
@@ -181,6 +181,22 @@ class Catalog(object):
         """
         self._jcatalog.dropTempView(viewName)
 
+    @since(2.1)
+    def dropGlobalTempView(self, viewName):
+        """Drops the global temporary view with the given view name in the catalog.
+        If the view has been cached before, then it will also be uncached.
+
+        >>> spark.createDataFrame([(1, 1)]).createGlobalTempView("my_table")
+        >>> spark.table("global_temp.my_table").collect()
+        [Row(_1=1, _2=1)]
+        >>> spark.catalog.dropGlobalTempView("my_table")
+        >>> spark.table("global_temp.my_table") # doctest: +IGNORE_EXCEPTION_DETAIL
+        Traceback (most recent call last):
+            ...
+        AnalysisException: ...
+        """
+        self._jcatalog.dropGlobalTempView(viewName)
+
     @ignore_unicode_prefix
     @since(2.0)
     def registerFunction(self, name, f, returnType=StringType()):

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 7482be8..8264dcf 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -386,7 +386,7 @@ class SQLContext(object):
         >>> sqlContext.registerDataFrameAsTable(df, "table1")
         >>> df2 = sqlContext.tables()
         >>> df2.filter("tableName = 'table1'").first()
-        Row(tableName=u'table1', isTemporary=True)
+        Row(database=u'', tableName=u'table1', isTemporary=True)
         """
         if dbName is None:
             return DataFrame(self._ssql_ctx.tables(), self)

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 0ac481a..14e80ea 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -131,7 +131,7 @@ class DataFrame(object):
 
     @since(2.0)
     def createTempView(self, name):
-        """Creates a temporary view with this DataFrame.
+        """Creates a local temporary view with this DataFrame.
 
         The lifetime of this temporary table is tied to the :class:`SparkSession`
         that was used to create this :class:`DataFrame`.
@@ -153,7 +153,7 @@ class DataFrame(object):
 
     @since(2.0)
     def createOrReplaceTempView(self, name):
-        """Creates or replaces a temporary view with this DataFrame.
+        """Creates or replaces a local temporary view with this DataFrame.
 
         The lifetime of this temporary table is tied to the :class:`SparkSession`
         that was used to create this :class:`DataFrame`.
@@ -169,6 +169,27 @@ class DataFrame(object):
         """
         self._jdf.createOrReplaceTempView(name)
 
+    @since(2.1)
+    def createGlobalTempView(self, name):
+        """Creates a global temporary view with this DataFrame.
+
+        The lifetime of this temporary view is tied to this Spark application.
+        throws :class:`TempTableAlreadyExistsException`, if the view name already exists in the
+        catalog.
+
+        >>> df.createGlobalTempView("people")
+        >>> df2 = spark.sql("select * from global_temp.people")
+        >>> sorted(df.collect()) == sorted(df2.collect())
+        True
+        >>> df.createGlobalTempView("people")  # doctest: +IGNORE_EXCEPTION_DETAIL
+        Traceback (most recent call last):
+        ...
+        AnalysisException: u"Temporary table 'people' already exists;"
+        >>> spark.catalog.dropGlobalTempView("people")
+
+        """
+        self._jdf.createGlobalTempView(name)
+
     @property
     @since(1.4)
     def write(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index a3bbace..b599a88 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -111,11 +111,12 @@ statement
     | ALTER TABLE tableIdentifier RECOVER PARTITIONS                   #recoverPartitions
     | DROP TABLE (IF EXISTS)? tableIdentifier PURGE?                   #dropTable
     | DROP VIEW (IF EXISTS)? tableIdentifier                           #dropTable
-    | CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
+    | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
+        VIEW (IF NOT EXISTS)? tableIdentifier
         identifierCommentList? (COMMENT STRING)?
         (PARTITIONED ON identifierList)?
         (TBLPROPERTIES tablePropertyList)? AS query                    #createView
-    | CREATE (OR REPLACE)? TEMPORARY VIEW
+    | CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
         tableIdentifier ('(' colTypeList ')')? tableProvider
         (OPTIONS tablePropertyList)?                                   #createTempViewUsing
     | ALTER VIEW tableIdentifier AS? query                             #alterViewQuery
@@ -676,7 +677,7 @@ nonReserved
     | MAP | ARRAY | STRUCT
     | LATERAL | WINDOW | REDUCE | TRANSFORM | USING | SERDE | SERDEPROPERTIES | RECORDREADER
     | DELIMITED | FIELDS | TERMINATED | COLLECTION | ITEMS | KEYS | ESCAPED | LINES | SEPARATED
-    | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | TEMPORARY | OPTIONS
+    | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | GLOBAL | TEMPORARY | OPTIONS
     | GROUPING | CUBE | ROLLUP
     | EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN
     | TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF
@@ -864,6 +865,7 @@ CACHE: 'CACHE';
 UNCACHE: 'UNCACHE';
 LAZY: 'LAZY';
 FORMATTED: 'FORMATTED';
+GLOBAL: 'GLOBAL';
 TEMPORARY: 'TEMPORARY' | 'TEMP';
 OPTIONS: 'OPTIONS';
 UNSET: 'UNSET';

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ae8869f..536d387 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -458,12 +458,12 @@ class Analyzer(
         i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
       case u: UnresolvedRelation =>
         val table = u.tableIdentifier
-        if (table.database.isDefined && conf.runSQLonFile &&
+        if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) &&
             (!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) {
-          // If the table does not exist, and the database part is specified, and we support
-          // running SQL directly on files, then let's just return the original UnresolvedRelation.
-          // It is possible we are matching a query like "select * from parquet.`/path/to/query`".
-          // The plan will get resolved later.
+          // If the database part is specified, and we support running SQL directly on files, and
+          // it's not a temporary view, and the table does not exist, then let's just return the
+          // original UnresolvedRelation. It is possible we are matching a query like "select *
+          // from parquet.`/path/to/query`". The plan will get resolved later.
           // Note that we are testing (!db_exists || !table_exists) because the catalog throws
           // an exception from tableExists if the database does not exist.
           u

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala
new file mode 100644
index 0000000..6095ac0
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util.StringUtils
+
+
+/**
+ * A thread-safe manager for global temporary views, providing atomic operations to manage them,
+ * e.g. create, update, remove, etc.
+ *
+ * Note that, the view name is always case-sensitive here, callers are responsible to format the
+ * view name w.r.t. case-sensitive config.
+ *
+ * @param database The system preserved virtual database that keeps all the global temporary views.
+ */
+class GlobalTempViewManager(val database: String) {
+
+  /** List of view definitions, mapping from view name to logical plan. */
+  @GuardedBy("this")
+  private val viewDefinitions = new mutable.HashMap[String, LogicalPlan]
+
+  /**
+   * Returns the global view definition which matches the given name, or None if not found.
+   */
+  def get(name: String): Option[LogicalPlan] = synchronized {
+    viewDefinitions.get(name)
+  }
+
+  /**
+   * Creates a global temp view, or issue an exception if the view already exists and
+   * `overrideIfExists` is false.
+   */
+  def create(
+      name: String,
+      viewDefinition: LogicalPlan,
+      overrideIfExists: Boolean): Unit = synchronized {
+    if (!overrideIfExists && viewDefinitions.contains(name)) {
+      throw new TempTableAlreadyExistsException(name)
+    }
+    viewDefinitions.put(name, viewDefinition)
+  }
+
+  /**
+   * Updates the global temp view if it exists, returns true if updated, false otherwise.
+   */
+  def update(
+      name: String,
+      viewDefinition: LogicalPlan): Boolean = synchronized {
+    if (viewDefinitions.contains(name)) {
+      viewDefinitions.put(name, viewDefinition)
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Removes the global temp view if it exists, returns true if removed, false otherwise.
+   */
+  def remove(name: String): Boolean = synchronized {
+    viewDefinitions.remove(name).isDefined
+  }
+
+  /**
+   * Renames the global temp view if the source view exists and the destination view not exists, or
+   * issue an exception if the source view exists but the destination view already exists. Returns
+   * true if renamed, false otherwise.
+   */
+  def rename(oldName: String, newName: String): Boolean = synchronized {
+    if (viewDefinitions.contains(oldName)) {
+      if (viewDefinitions.contains(newName)) {
+        throw new AnalysisException(
+          s"rename temporary view from '$oldName' to '$newName': destination view already exists")
+      }
+
+      val viewDefinition = viewDefinitions(oldName)
+      viewDefinitions.remove(oldName)
+      viewDefinitions.put(newName, viewDefinition)
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Lists the names of all global temporary views.
+   */
+  def listViewNames(pattern: String): Seq[String] = synchronized {
+    StringUtils.filterPattern(viewDefinitions.keys.toSeq, pattern)
+  }
+
+  /**
+   * Clears all the global temporary views.
+   */
+  def clear(): Unit = synchronized {
+    viewDefinitions.clear()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8c01c7a..e44e30e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.GLOBAL_TEMP_DATABASE
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -47,6 +48,7 @@ object SessionCatalog {
  */
 class SessionCatalog(
     externalCatalog: ExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
     functionResourceLoader: FunctionResourceLoader,
     functionRegistry: FunctionRegistry,
     conf: CatalystConf,
@@ -61,6 +63,7 @@ class SessionCatalog(
       conf: CatalystConf) {
     this(
       externalCatalog,
+      new GlobalTempViewManager(GLOBAL_TEMP_DATABASE.defaultValueString),
       DummyFunctionResourceLoader,
       functionRegistry,
       conf,
@@ -142,8 +145,13 @@ class SessionCatalog(
   // ----------------------------------------------------------------------------
 
   def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
-    val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
     val dbName = formatDatabaseName(dbDefinition.name)
+    if (dbName == globalTempViewManager.database) {
+      throw new AnalysisException(
+        s"${globalTempViewManager.database} is a system preserved database, " +
+          "you cannot create a database with this name.")
+    }
+    val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
     externalCatalog.createDatabase(
       dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
       ignoreIfExists)
@@ -154,7 +162,7 @@ class SessionCatalog(
     if (dbName == DEFAULT_DATABASE) {
       throw new AnalysisException(s"Can not drop default database")
     } else if (dbName == getCurrentDatabase) {
-      throw new AnalysisException(s"Can not drop current database `${dbName}`")
+      throw new AnalysisException(s"Can not drop current database `$dbName`")
     }
     externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade)
   }
@@ -188,6 +196,13 @@ class SessionCatalog(
 
   def setCurrentDatabase(db: String): Unit = {
     val dbName = formatDatabaseName(db)
+    if (dbName == globalTempViewManager.database) {
+      throw new AnalysisException(
+        s"${globalTempViewManager.database} is a system preserved database, " +
+          "you cannot use it as current database. To access global temporary views, you should " +
+          "use qualified name with the GLOBAL_TEMP_DATABASE, e.g. SELECT * FROM " +
+          s"${globalTempViewManager.database}.viewName.")
+    }
     requireDbExists(dbName)
     synchronized { currentDb = dbName }
   }
@@ -329,7 +344,7 @@ class SessionCatalog(
   // ----------------------------------------------
 
   /**
-   * Create a temporary table.
+   * Create a local temporary view.
    */
   def createTempView(
       name: String,
@@ -343,19 +358,65 @@ class SessionCatalog(
   }
 
   /**
-   * Return a temporary view exactly as it was stored.
+   * Create a global temporary view.
+   */
+  def createGlobalTempView(
+      name: String,
+      viewDefinition: LogicalPlan,
+      overrideIfExists: Boolean): Unit = {
+    globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists)
+  }
+
+  /**
+   * Alter the definition of a local/global temp view matching the given name, returns true if a
+   * temp view is matched and altered, false otherwise.
+   */
+  def alterTempViewDefinition(
+      name: TableIdentifier,
+      viewDefinition: LogicalPlan): Boolean = synchronized {
+    val viewName = formatTableName(name.table)
+    if (name.database.isEmpty) {
+      if (tempTables.contains(viewName)) {
+        createTempView(viewName, viewDefinition, overrideIfExists = true)
+        true
+      } else {
+        false
+      }
+    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+      globalTempViewManager.update(viewName, viewDefinition)
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Return a local temporary view exactly as it was stored.
    */
   def getTempView(name: String): Option[LogicalPlan] = synchronized {
     tempTables.get(formatTableName(name))
   }
 
   /**
-   * Drop a temporary view.
+   * Return a global temporary view exactly as it was stored.
+   */
+  def getGlobalTempView(name: String): Option[LogicalPlan] = {
+    globalTempViewManager.get(formatTableName(name))
+  }
+
+  /**
+   * Drop a local temporary view.
    */
   def dropTempView(name: String): Unit = synchronized {
     tempTables.remove(formatTableName(name))
   }
 
+  /**
+   * Drop a global temporary view.
+   */
+  def dropGlobalTempView(name: String): Boolean = {
+    globalTempViewManager.remove(formatTableName(name))
+  }
+
   // -------------------------------------------------------------
   // | Methods that interact with temporary and metastore tables |
   // -------------------------------------------------------------
@@ -371,9 +432,7 @@ class SessionCatalog(
    */
   def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
     val table = formatTableName(name.table)
-    if (name.database.isDefined) {
-      getTableMetadata(name)
-    } else {
+    if (name.database.isEmpty) {
       getTempView(table).map { plan =>
         CatalogTable(
           identifier = TableIdentifier(table),
@@ -381,6 +440,16 @@ class SessionCatalog(
           storage = CatalogStorageFormat.empty,
           schema = plan.output.toStructType)
       }.getOrElse(getTableMetadata(name))
+    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+      globalTempViewManager.get(table).map { plan =>
+        CatalogTable(
+          identifier = TableIdentifier(table, Some(globalTempViewManager.database)),
+          tableType = CatalogTableType.VIEW,
+          storage = CatalogStorageFormat.empty,
+          schema = plan.output.toStructType)
+      }.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
+    } else {
+      getTableMetadata(name)
     }
   }
 
@@ -393,21 +462,25 @@ class SessionCatalog(
    */
   def renameTable(oldName: TableIdentifier, newName: String): Unit = synchronized {
     val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
-    requireDbExists(db)
     val oldTableName = formatTableName(oldName.table)
     val newTableName = formatTableName(newName)
-    if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
-      requireTableExists(TableIdentifier(oldTableName, Some(db)))
-      requireTableNotExists(TableIdentifier(newTableName, Some(db)))
-      externalCatalog.renameTable(db, oldTableName, newTableName)
+    if (db == globalTempViewManager.database) {
+      globalTempViewManager.rename(oldTableName, newTableName)
     } else {
-      if (tempTables.contains(newTableName)) {
-        throw new AnalysisException(
-          s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': destination table already exists")
+      requireDbExists(db)
+      if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
+        requireTableExists(TableIdentifier(oldTableName, Some(db)))
+        requireTableNotExists(TableIdentifier(newTableName, Some(db)))
+        externalCatalog.renameTable(db, oldTableName, newTableName)
+      } else {
+        if (tempTables.contains(newTableName)) {
+          throw new AnalysisException(s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': " +
+            "destination table already exists")
+        }
+        val table = tempTables(oldTableName)
+        tempTables.remove(oldTableName)
+        tempTables.put(newTableName, table)
       }
-      val table = tempTables(oldTableName)
-      tempTables.remove(oldTableName)
-      tempTables.put(newTableName, table)
     }
   }
 
@@ -424,17 +497,24 @@ class SessionCatalog(
       purge: Boolean): Unit = synchronized {
     val db = formatDatabaseName(name.database.getOrElse(currentDb))
     val table = formatTableName(name.table)
-    if (name.database.isDefined || !tempTables.contains(table)) {
-      requireDbExists(db)
-      // When ignoreIfNotExists is false, no exception is issued when the table does not exist.
-      // Instead, log it as an error message.
-      if (tableExists(TableIdentifier(table, Option(db)))) {
-        externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
-      } else if (!ignoreIfNotExists) {
-        throw new NoSuchTableException(db = db, table = table)
+    if (db == globalTempViewManager.database) {
+      val viewExists = globalTempViewManager.remove(table)
+      if (!viewExists && !ignoreIfNotExists) {
+        throw new NoSuchTableException(globalTempViewManager.database, table)
       }
     } else {
-      tempTables.remove(table)
+      if (name.database.isDefined || !tempTables.contains(table)) {
+        requireDbExists(db)
+        // When ignoreIfNotExists is false, no exception is issued when the table does not exist.
+        // Instead, log it as an error message.
+        if (tableExists(TableIdentifier(table, Option(db)))) {
+          externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
+        } else if (!ignoreIfNotExists) {
+          throw new NoSuchTableException(db = db, table = table)
+        }
+      } else {
+        tempTables.remove(table)
+      }
     }
   }
 
@@ -445,6 +525,9 @@ class SessionCatalog(
    * If no database is specified, this will first attempt to return a temporary table/view with
    * the same name, then, if that does not exist, return the table/view from the current database.
    *
+   * Note that, the global temp view database is also valid here, this will return the global temp
+   * view matching the given name.
+   *
    * If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will
    * track the name of the view.
    */
@@ -453,7 +536,11 @@ class SessionCatalog(
       val db = formatDatabaseName(name.database.getOrElse(currentDb))
       val table = formatTableName(name.table)
       val relationAlias = alias.getOrElse(table)
-      if (name.database.isDefined || !tempTables.contains(table)) {
+      if (db == globalTempViewManager.database) {
+        globalTempViewManager.get(table).map { viewDef =>
+          SubqueryAlias(relationAlias, viewDef, Some(name))
+        }.getOrElse(throw new NoSuchTableException(db, table))
+      } else if (name.database.isDefined || !tempTables.contains(table)) {
         val metadata = externalCatalog.getTable(db, table)
         val view = Option(metadata.tableType).collect {
           case CatalogTableType.VIEW => name
@@ -472,27 +559,48 @@ class SessionCatalog(
    * explicitly specified.
    */
   def isTemporaryTable(name: TableIdentifier): Boolean = synchronized {
-    name.database.isEmpty && tempTables.contains(formatTableName(name.table))
+    val table = formatTableName(name.table)
+    if (name.database.isEmpty) {
+      tempTables.contains(table)
+    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+      globalTempViewManager.get(table).isDefined
+    } else {
+      false
+    }
   }
 
   /**
-   * List all tables in the specified database, including temporary tables.
+   * List all tables in the specified database, including local temporary tables.
+   *
+   * Note that, if the specified database is global temporary view database, we will list global
+   * temporary views.
    */
   def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*")
 
   /**
-   * List all matching tables in the specified database, including temporary tables.
+   * List all matching tables in the specified database, including local temporary tables.
+   *
+   * Note that, if the specified database is global temporary view database, we will list global
+   * temporary views.
    */
   def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
     val dbName = formatDatabaseName(db)
-    requireDbExists(dbName)
-    val dbTables =
-      externalCatalog.listTables(dbName, pattern).map { t => TableIdentifier(t, Some(dbName)) }
-    synchronized {
-      val _tempTables = StringUtils.filterPattern(tempTables.keys.toSeq, pattern)
-        .map { t => TableIdentifier(t) }
-      dbTables ++ _tempTables
+    val dbTables = if (dbName == globalTempViewManager.database) {
+      globalTempViewManager.listViewNames(pattern).map { name =>
+        TableIdentifier(name, Some(globalTempViewManager.database))
+      }
+    } else {
+      requireDbExists(dbName)
+      externalCatalog.listTables(dbName, pattern).map { name =>
+        TableIdentifier(name, Some(dbName))
+      }
+    }
+    val localTempViews = synchronized {
+      StringUtils.filterPattern(tempTables.keys.toSeq, pattern).map { name =>
+        TableIdentifier(name)
+      }
     }
+    dbTables ++ localTempViews
   }
 
   /**
@@ -504,6 +612,8 @@ class SessionCatalog(
     // If the database is not defined, there is a good chance this is a temp table.
     if (name.database.isEmpty) {
       tempTables.get(formatTableName(name.table)).foreach(_.refresh())
+    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+      globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh())
     }
   }
 
@@ -919,6 +1029,7 @@ class SessionCatalog(
       }
     }
     tempTables.clear()
+    globalTempViewManager.clear()
     functionRegistry.clear()
     // restore built-in functions
     FunctionRegistry.builtin.listFunction().foreach { f =>

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 9cfbdff..4b52508 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.usePrettyExpression
 import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
-import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand}
+import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.execution.python.EvaluatePython
@@ -2433,9 +2433,13 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * Creates a temporary view using the given name. The lifetime of this
+   * Creates a local temporary view using the given name. The lifetime of this
    * temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
    *
+   * Local temporary view is session-scoped. Its lifetime is the lifetime of the session that
+   * created it, i.e. it will be automatically dropped when the session terminates. It's not
+   * tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
+   *
    * @throws AnalysisException if the view name already exists
    *
    * @group basic
@@ -2443,21 +2447,51 @@ class Dataset[T] private[sql](
    */
   @throws[AnalysisException]
   def createTempView(viewName: String): Unit = withPlan {
-    createViewCommand(viewName, replace = false)
+    createTempViewCommand(viewName, replace = false, global = false)
   }
 
+
+
   /**
-   * Creates a temporary view using the given name. The lifetime of this
+   * Creates a local temporary view using the given name. The lifetime of this
    * temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
    *
    * @group basic
    * @since 2.0.0
    */
   def createOrReplaceTempView(viewName: String): Unit = withPlan {
-    createViewCommand(viewName, replace = true)
+    createTempViewCommand(viewName, replace = true, global = false)
   }
 
-  private def createViewCommand(viewName: String, replace: Boolean): CreateViewCommand = {
+  /**
+   * Creates a global temporary view using the given name. The lifetime of this
+   * temporary view is tied to this Spark application.
+   *
+   * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark application,
+   * i.e. it will be automatically dropped when the application terminates. It's tied to a system
+   * preserved database `_global_temp`, and we must use the qualified name to refer a global temp
+   * view, e.g. `SELECT * FROM _global_temp.view1`.
+   *
+   * @throws TempTableAlreadyExistsException if the view name already exists
+   *
+   * @group basic
+   * @since 2.1.0
+   */
+  @throws[AnalysisException]
+  def createGlobalTempView(viewName: String): Unit = withPlan {
+    createTempViewCommand(viewName, replace = false, global = true)
+  }
+
+  private def createTempViewCommand(
+      viewName: String,
+      replace: Boolean,
+      global: Boolean): CreateViewCommand = {
+    val viewType = if (global) {
+      GlobalTempView
+    } else {
+      LocalTempView
+    }
+
     CreateViewCommand(
       name = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
       userSpecifiedColumns = Nil,
@@ -2467,7 +2501,7 @@ class Dataset[T] private[sql](
       child = logicalPlan,
       allowExisting = false,
       replace = replace,
-      isTemporary = true)
+      viewType = viewType)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 7f2762c..717fb29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -262,15 +262,33 @@ abstract class Catalog {
       options: Map[String, String]): DataFrame
 
   /**
-   * Drops the temporary view with the given view name in the catalog.
+   * Drops the local temporary view with the given view name in the catalog.
    * If the view has been cached before, then it will also be uncached.
    *
+   * Local temporary view is session-scoped. Its lifetime is the lifetime of the session that
+   * created it, i.e. it will be automatically dropped when the session terminates. It's not
+   * tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
+   *
    * @param viewName the name of the view to be dropped.
    * @since 2.0.0
    */
   def dropTempView(viewName: String): Unit
 
   /**
+   * Drops the global temporary view with the given view name in the catalog.
+   * If the view has been cached before, then it will also be uncached.
+   *
+   * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark application,
+   * i.e. it will be automatically dropped when the application terminates. It's tied to a system
+   * preserved database `_global_temp`, and we must use the qualified name to refer a global temp
+   * view, e.g. `SELECT * FROM _global_temp.view1`.
+   *
+   * @param viewName the name of the view to be dropped.
+   * @since 2.1.0
+   */
+  def dropGlobalTempView(viewName: String): Boolean
+
+  /**
    * Returns true if the table is currently cached in-memory.
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 383b3a2..cb45a6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -21,15 +21,14 @@ import java.nio.charset.StandardCharsets
 import java.sql.Timestamp
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
 import org.apache.spark.util.Utils
 
@@ -125,6 +124,9 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
               .mkString("\t")
         }
       }
+    // SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp.
+    case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] =>
+      command.executeCollect().map(_.getString(1))
     case command: ExecutedCommandExec =>
       command.executeCollect().map(_.getString(0))
     case other =>

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 5f87b71..be2eddb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -29,9 +29,9 @@ import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{CreateTable, CreateTempViewUsing, _}
+import org.apache.spark.sql.execution.datasources.{CreateTable, _}
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.StructType
 
 /**
  * Concrete parser for Spark SQL statements.
@@ -385,7 +385,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
         logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
           "CREATE TEMPORARY VIEW ... USING ... instead")
-        CreateTempViewUsing(table, schema, replace = true, provider, options)
+        CreateTempViewUsing(table, schema, replace = true, global = false, provider, options)
       } else {
         CreateTable(tableDesc, mode, None)
       }
@@ -401,6 +401,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       tableIdent = visitTableIdentifier(ctx.tableIdentifier()),
       userSpecifiedSchema = Option(ctx.colTypeList()).map(createSchema),
       replace = ctx.REPLACE != null,
+      global = ctx.GLOBAL != null,
       provider = ctx.tableProvider.qualifiedName.getText,
       options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
   }
@@ -1269,7 +1270,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    *
    * For example:
    * {{{
-   *   CREATE [OR REPLACE] [TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name
+   *   CREATE [OR REPLACE] [[GLOBAL] TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name
    *   [(column_name [COMMENT column_comment], ...) ]
    *   [COMMENT view_comment]
    *   [TBLPROPERTIES (property_name = property_value, ...)]
@@ -1286,6 +1287,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         }
       }
 
+      val viewType = if (ctx.TEMPORARY == null) {
+        PersistedView
+      } else if (ctx.GLOBAL != null) {
+        GlobalTempView
+      } else {
+        LocalTempView
+      }
+
       CreateViewCommand(
         name = visitTableIdentifier(ctx.tableIdentifier),
         userSpecifiedColumns = userSpecifiedColumns,
@@ -1295,7 +1304,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         child = plan(ctx.query),
         allowExisting = ctx.EXISTS != null,
         replace = ctx.REPLACE != null,
-        isTemporary = ctx.TEMPORARY != null)
+        viewType = viewType)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 01ac898..45fa293 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -183,17 +183,20 @@ case class DropTableCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
-    // issue an exception.
-    catalog.getTableMetadataOption(tableName).map(_.tableType match {
-      case CatalogTableType.VIEW if !isView =>
-        throw new AnalysisException(
-          "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
-      case o if o != CatalogTableType.VIEW && isView =>
-        throw new AnalysisException(
-          s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
-      case _ =>
-    })
+
+    if (!catalog.isTemporaryTable(tableName) && catalog.tableExists(tableName)) {
+      // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
+      // issue an exception.
+      catalog.getTableMetadata(tableName).tableType match {
+        case CatalogTableType.VIEW if !isView =>
+          throw new AnalysisException(
+            "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
+        case o if o != CatalogTableType.VIEW && isView =>
+          throw new AnalysisException(
+            s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
+        case _ =>
+      }
+    }
     try {
       sparkSession.sharedState.cacheManager.uncacheQuery(
         sparkSession.table(tableName.quotedString))

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 08de6cd..424ef58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -579,9 +579,10 @@ case class ShowTablesCommand(
     databaseName: Option[String],
     tableIdentifierPattern: Option[String]) extends RunnableCommand {
 
-  // The result of SHOW TABLES has two columns, tableName and isTemporary.
+  // The result of SHOW TABLES has three columns: database, tableName and isTemporary.
   override val output: Seq[Attribute] = {
-    AttributeReference("tableName", StringType, nullable = false)() ::
+    AttributeReference("database", StringType, nullable = false)() ::
+      AttributeReference("tableName", StringType, nullable = false)() ::
       AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil
   }
 
@@ -592,9 +593,9 @@ case class ShowTablesCommand(
     val db = databaseName.getOrElse(catalog.getCurrentDatabase)
     val tables =
       tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db))
-    tables.map { t =>
-      val isTemp = t.database.isEmpty
-      Row(t.table, isTemp)
+    tables.map { tableIdent =>
+      val isTemp = catalog.isTemporaryTable(tableIdent)
+      Row(tableIdent.database.getOrElse(""), tableIdent.table, isTemp)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 15340ee..bbcd9c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -19,13 +19,46 @@ package org.apache.spark.sql.execution.command
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+
+
+/**
+ * ViewType is used to specify the expected view type when we want to create or replace a view in
+ * [[CreateViewCommand]].
+ */
+sealed trait ViewType
+
+/**
+ * LocalTempView means session-scoped local temporary views. Its lifetime is the lifetime of the
+ * session that created it, i.e. it will be automatically dropped when the session terminates. It's
+ * not tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
+ */
+object LocalTempView extends ViewType
+
+/**
+ * GlobalTempView means cross-session global temporary views. Its lifetime is the lifetime of the
+ * Spark application, i.e. it will be automatically dropped when the application terminates. It's
+ * tied to a system preserved database `_global_temp`, and we must use the qualified name to refer a
+ * global temp view, e.g. SELECT * FROM _global_temp.view1.
+ */
+object GlobalTempView extends ViewType
+
+/**
+ * PersistedView means cross-session persisted views. Persisted views stay until they are
+ * explicitly dropped by user command. It's always tied to a database, default to the current
+ * database if not specified.
+ *
+ * Note that, Existing persisted view with the same name are not visible to the current session
+ * while the local temporary view exists, unless the view name is qualified by database.
+ */
+object PersistedView extends ViewType
 
 
 /**
@@ -46,10 +79,7 @@ import org.apache.spark.sql.types.StructType
  *                already exists, throws analysis exception.
  * @param replace if true, and if the view already exists, updates it; if false, and if the view
  *                already exists, throws analysis exception.
- * @param isTemporary if true, the view is created as a temporary view. Temporary views are dropped
- *                 at the end of current Spark session. Existing permanent relations with the same
- *                 name are not visible to the current session while the temporary view exists,
- *                 unless they are specified with full qualified table name with database prefix.
+ * @param viewType the expected view type to be created with this command.
  */
 case class CreateViewCommand(
     name: TableIdentifier,
@@ -60,20 +90,21 @@ case class CreateViewCommand(
     child: LogicalPlan,
     allowExisting: Boolean,
     replace: Boolean,
-    isTemporary: Boolean)
+    viewType: ViewType)
   extends RunnableCommand {
 
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
 
-  if (!isTemporary) {
-    require(originalText.isDefined,
-      "The table to created with CREATE VIEW must have 'originalText'.")
+  if (viewType == PersistedView) {
+    require(originalText.isDefined, "'originalText' must be provided to create permanent view")
   }
 
   if (allowExisting && replace) {
     throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
   }
 
+  private def isTemporary = viewType == LocalTempView || viewType == GlobalTempView
+
   // Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE'
   if (allowExisting && isTemporary) {
     throw new AnalysisException(
@@ -99,72 +130,53 @@ case class CreateViewCommand(
         s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " +
         s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).")
     }
-    val sessionState = sparkSession.sessionState
-
-    if (isTemporary) {
-      createTemporaryView(sparkSession, analyzedPlan)
-    } else {
-      // Adds default database for permanent table if it doesn't exist, so that tableExists()
-      // only check permanent tables.
-      val database = name.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-      val qualifiedName = name.copy(database = Option(database))
-
-      if (sessionState.catalog.tableExists(qualifiedName)) {
-        val tableMetadata = sessionState.catalog.getTableMetadata(qualifiedName)
-        if (allowExisting) {
-          // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
-          // already exists.
-        } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
-          throw new AnalysisException(s"$qualifiedName is not a view")
-        } else if (replace) {
-          // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
-          sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
-        } else {
-          // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
-          // exists.
-          throw new AnalysisException(
-            s"View $qualifiedName already exists. If you want to update the view definition, " +
-              "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
-        }
-      } else {
-        // Create the view if it doesn't exist.
-        sessionState.catalog.createTable(
-          prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
-      }
-    }
-    Seq.empty[Row]
-  }
-
-  private def createTemporaryView(sparkSession: SparkSession, analyzedPlan: LogicalPlan): Unit = {
-    val catalog = sparkSession.sessionState.catalog
 
-    // Projects column names to alias names
-    val logicalPlan = if (userSpecifiedColumns.isEmpty) {
+    val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
       analyzedPlan
     } else {
       val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
-        case (attr, (colName, _)) => Alias(attr, colName)()
+        case (attr, (colName, None)) => Alias(attr, colName)()
+        case (attr, (colName, Some(colComment))) =>
+          val meta = new MetadataBuilder().putString("comment", colComment).build()
+          Alias(attr, colName)(explicitMetadata = Some(meta))
       }
       sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
     }
 
-    catalog.createTempView(name.table, logicalPlan, replace)
+    val catalog = sparkSession.sessionState.catalog
+    if (viewType == LocalTempView) {
+      catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
+    } else if (viewType == GlobalTempView) {
+      catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace)
+    } else if (catalog.tableExists(name)) {
+      val tableMetadata = catalog.getTableMetadata(name)
+      if (allowExisting) {
+        // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
+        // already exists.
+      } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
+        throw new AnalysisException(s"$name is not a view")
+      } else if (replace) {
+        // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+        catalog.alterTable(prepareTable(sparkSession, aliasedPlan))
+      } else {
+        // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
+        // exists.
+        throw new AnalysisException(
+          s"View $name already exists. If you want to update the view definition, " +
+            "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
+      }
+    } else {
+      // Create the view if it doesn't exist.
+      catalog.createTable(prepareTable(sparkSession, aliasedPlan), ignoreIfExists = false)
+    }
+    Seq.empty[Row]
   }
 
   /**
    * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
    * SQL based on the analyzed plan, and also creates the proper schema for the view.
    */
-  private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
-    val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
-      analyzedPlan
-    } else {
-      val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
-        case (attr, (colName, _)) => Alias(attr, colName)()
-      }
-      sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
-    }
-
+  private def prepareTable(sparkSession: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = {
     val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
 
     // Validate the view SQL - make sure we can parse it and analyze it.
@@ -176,19 +188,11 @@ case class CreateViewCommand(
         throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
     }
 
-    val viewSchema = if (userSpecifiedColumns.isEmpty) {
-      aliasedPlan.schema
-    } else {
-      StructType(aliasedPlan.schema.zip(userSpecifiedColumns).map {
-        case (field, (_, comment)) => comment.map(field.withComment).getOrElse(field)
-      })
-    }
-
     CatalogTable(
       identifier = name,
       tableType = CatalogTableType.VIEW,
       storage = CatalogStorageFormat.empty,
-      schema = viewSchema,
+      schema = aliasedPlan.schema,
       properties = properties,
       viewOriginalText = originalText,
       viewText = Some(viewSQL),
@@ -222,8 +226,8 @@ case class AlterViewAsCommand(
     qe.assertAnalyzed()
     val analyzedPlan = qe.analyzed
 
-    if (session.sessionState.catalog.isTemporaryTable(name)) {
-      session.sessionState.catalog.createTempView(name.table, analyzedPlan, overrideIfExists = true)
+    if (session.sessionState.catalog.alterTempViewDefinition(name, analyzedPlan)) {
+      // a local/global temp view has been altered, we are done.
     } else {
       alterPermanentView(session, analyzedPlan)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index fa95af2..59fb48f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -40,16 +40,20 @@ case class CreateTable(
   override def innerChildren: Seq[QueryPlan[_]] = query.toSeq
 }
 
+/**
+ * Create or replace a local/global temporary view with given data source.
+ */
 case class CreateTempViewUsing(
     tableIdent: TableIdentifier,
     userSpecifiedSchema: Option[StructType],
     replace: Boolean,
+    global: Boolean,
     provider: String,
     options: Map[String, String]) extends RunnableCommand {
 
   if (tableIdent.database.isDefined) {
     throw new AnalysisException(
-      s"Temporary table '$tableIdent' should not have specified a database")
+      s"Temporary view '$tableIdent' should not have specified a database")
   }
 
   def run(sparkSession: SparkSession): Seq[Row] = {
@@ -58,10 +62,16 @@ case class CreateTempViewUsing(
       userSpecifiedSchema = userSpecifiedSchema,
       className = provider,
       options = options)
-    sparkSession.sessionState.catalog.createTempView(
-      tableIdent.table,
-      Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
-      replace)
+
+    val catalog = sparkSession.sessionState.catalog
+    val viewDefinition = Dataset.ofRows(
+      sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan
+
+    if (global) {
+      catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace)
+    } else {
+      catalog.createTempView(tableIdent.table, viewDefinition, replace)
+    }
 
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index e412e1b..c05bda3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -94,20 +94,19 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    */
   @throws[AnalysisException]("database does not exist")
   override def listTables(dbName: String): Dataset[Table] = {
-    requireDatabaseExists(dbName)
     val tables = sessionCatalog.listTables(dbName).map(makeTable)
     CatalogImpl.makeDataset(tables, sparkSession)
   }
 
   private def makeTable(tableIdent: TableIdentifier): Table = {
     val metadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
-    val database = metadata.identifier.database
+    val isTemp = sessionCatalog.isTemporaryTable(tableIdent)
     new Table(
       name = tableIdent.table,
-      database = database.orNull,
+      database = metadata.identifier.database.orNull,
       description = metadata.comment.orNull,
-      tableType = if (database.isEmpty) "TEMPORARY" else metadata.tableType.name,
-      isTemporary = database.isEmpty)
+      tableType = if (isTemp) "TEMPORARY" else metadata.tableType.name,
+      isTemporary = isTemp)
   }
 
   /**
@@ -365,7 +364,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
   }
 
   /**
-   * Drops the temporary view with the given view name in the catalog.
+   * Drops the local temporary view with the given view name in the catalog.
    * If the view has been cached/persisted before, it's also unpersisted.
    *
    * @param viewName the name of the view to be dropped.
@@ -380,6 +379,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
   }
 
   /**
+   * Drops the global temporary view with the given view name in the catalog.
+   * If the view has been cached/persisted before, it's also unpersisted.
+   *
+   * @param viewName the name of the view to be dropped.
+   * @group ddl_ops
+   * @since 2.1.0
+   */
+  override def dropGlobalTempView(viewName: String): Boolean = {
+    sparkSession.sessionState.catalog.getGlobalTempView(viewName).exists { viewDef =>
+      sparkSession.sharedState.cacheManager.uncacheQuery(Dataset.ofRows(sparkSession, viewDef))
+      sessionCatalog.dropGlobalTempView(viewName)
+    }
+  }
+
+  /**
    * Returns true if the table is currently cached in-memory.
    *
    * @group cachemgmt

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 9f7d001..8759dfe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -95,6 +95,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
    */
   lazy val catalog = new SessionCatalog(
     sparkSession.sharedState.externalCatalog,
+    sparkSession.sharedState.globalTempViewManager,
     functionResourceLoader,
     functionRegistry,
     conf,

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 6387f01..c555a43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -22,11 +22,11 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager, InMemoryCatalog}
 import org.apache.spark.sql.execution.CacheManager
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
 import org.apache.spark.util.{MutableURLClassLoader, Utils}
@@ -37,39 +37,14 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
  */
 private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
 
-  /**
-   * Class for caching query results reused in future executions.
-   */
-  val cacheManager: CacheManager = new CacheManager
-
-  /**
-   * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s.
-   */
-  val listener: SQLListener = createListenerAndUI(sparkContext)
-
+  // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
+  // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
   {
     val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
     if (configFile != null) {
       sparkContext.hadoopConfiguration.addResource(configFile)
     }
-  }
-
-  /**
-   * A catalog that interacts with external systems.
-   */
-  lazy val externalCatalog: ExternalCatalog =
-    SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
-      SharedState.externalCatalogClassName(sparkContext.conf),
-      sparkContext.conf,
-      sparkContext.hadoopConfiguration)
-
-  /**
-   * A classloader used to load all user-added jar.
-   */
-  val jarClassLoader = new NonClosableMutableURLClassLoader(
-    org.apache.spark.util.Utils.getContextOrSparkClassLoader)
 
-  {
     // Set the Hive metastore warehouse path to the one we use
     val tempConf = new SQLConf
     sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) }
@@ -94,6 +69,48 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
   }
 
   /**
+   * Class for caching query results reused in future executions.
+   */
+  val cacheManager: CacheManager = new CacheManager
+
+  /**
+   * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s.
+   */
+  val listener: SQLListener = createListenerAndUI(sparkContext)
+
+  /**
+   * A catalog that interacts with external systems.
+   */
+  val externalCatalog: ExternalCatalog =
+    SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
+      SharedState.externalCatalogClassName(sparkContext.conf),
+      sparkContext.conf,
+      sparkContext.hadoopConfiguration)
+
+  /**
+   * A manager for global temporary views.
+   */
+  val globalTempViewManager = {
+    // System preserved database should not exists in metastore. However it's hard to guarantee it
+    // for every session, because case-sensitivity differs. Here we always lowercase it to make our
+    // life easier.
+    val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase
+    if (externalCatalog.databaseExists(globalTempDB)) {
+      throw new SparkException(
+        s"$globalTempDB is a system preserved database, please rename your existing database " +
+          "to resolve the name conflict, or set a different value for " +
+          s"${GLOBAL_TEMP_DATABASE.key}, and launch your Spark application again.")
+    }
+    new GlobalTempViewManager(globalTempDB)
+  }
+
+  /**
+   * A classloader used to load all user-added jar.
+   */
+  val jarClassLoader = new NonClosableMutableURLClassLoader(
+    org.apache.spark.util.Utils.getContextOrSparkClassLoader)
+
+  /**
    * Create a SQLListener then add it into SparkContext, and create a SQLTab if there is SparkUI.
    */
   private def createListenerAndUI(sc: SparkContext): SQLListener = {

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 001c1a1..2b35db4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -88,11 +88,11 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
     df.createOrReplaceTempView("listtablessuitetable")
     assert(
       sqlContext.tables().filter("tableName = 'listtablessuitetable'").collect().toSeq ==
-      Row("listtablessuitetable", true) :: Nil)
+      Row("", "listtablessuitetable", true) :: Nil)
 
     assert(
       sqlContext.sql("SHOW tables").filter("tableName = 'listtablessuitetable'").collect().toSeq ==
-      Row("listtablessuitetable", true) :: Nil)
+      Row("", "listtablessuitetable", true) :: Nil)
 
     sqlContext.sessionState.catalog.dropTable(
       TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
@@ -105,11 +105,11 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
     df.createOrReplaceTempView("listtablessuitetable")
     assert(
       sqlContext.tables("default").filter("tableName = 'listtablessuitetable'").collect().toSeq ==
-      Row("listtablessuitetable", true) :: Nil)
+      Row("", "listtablessuitetable", true) :: Nil)
 
     assert(
       sqlContext.sql("show TABLES in default").filter("tableName = 'listtablessuitetable'")
-        .collect().toSeq == Row("listtablessuitetable", true) :: Nil)
+        .collect().toSeq == Row("", "listtablessuitetable", true) :: Nil)
 
     sqlContext.sessionState.catalog.dropTable(
       TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
@@ -122,7 +122,8 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
     df.createOrReplaceTempView("listtablessuitetable")
 
     val expectedSchema = StructType(
-      StructField("tableName", StringType, false) ::
+      StructField("database", StringType, false) ::
+        StructField("tableName", StringType, false) ::
         StructField("isTemporary", BooleanType, false) :: Nil)
 
     Seq(sqlContext.tables(), sqlContext.sql("SHOW TABLes")).foreach {

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
new file mode 100644
index 0000000..391bcb8
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalog.Table
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class GlobalTempViewSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    globalTempDB = spark.sharedState.globalTempViewManager.database
+  }
+
+  private var globalTempDB: String = _
+
+  test("basic semantic") {
+    sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 'a'")
+
+    // If there is no database in table name, we should try local temp view first, if not found,
+    // try table/view in current database, which is "default" in this case. So we expect
+    // NoSuchTableException here.
+    intercept[NoSuchTableException](spark.table("src"))
+
+    // Use qualified name to refer to the global temp view explicitly.
+    checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
+
+    // Table name without database will never refer to a global temp view.
+    intercept[NoSuchTableException](sql("DROP VIEW src"))
+
+    sql(s"DROP VIEW $globalTempDB.src")
+    // The global temp view should be dropped successfully.
+    intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
+
+    // We can also use Dataset API to create global temp view
+    Seq(1 -> "a").toDF("i", "j").createGlobalTempView("src")
+    checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
+
+    // Use qualified name to rename a global temp view.
+    sql(s"ALTER VIEW $globalTempDB.src RENAME TO src2")
+    intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
+    checkAnswer(spark.table(s"$globalTempDB.src2"), Row(1, "a"))
+
+    // Use qualified name to alter a global temp view.
+    sql(s"ALTER VIEW $globalTempDB.src2 AS SELECT 2, 'b'")
+    checkAnswer(spark.table(s"$globalTempDB.src2"), Row(2, "b"))
+
+    // We can also use Catalog API to drop global temp view
+    spark.catalog.dropGlobalTempView("src2")
+    intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2"))
+  }
+
+  test("global temp view is shared among all sessions") {
+    try {
+      sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 2")
+      checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, 2))
+      val newSession = spark.newSession()
+      checkAnswer(newSession.table(s"$globalTempDB.src"), Row(1, 2))
+    } finally {
+      spark.catalog.dropGlobalTempView("src")
+    }
+  }
+
+  test("global temp view database should be preserved") {
+    val e = intercept[AnalysisException](sql(s"CREATE DATABASE $globalTempDB"))
+    assert(e.message.contains("system preserved database"))
+
+    val e2 = intercept[AnalysisException](sql(s"USE $globalTempDB"))
+    assert(e2.message.contains("system preserved database"))
+  }
+
+  test("CREATE GLOBAL TEMP VIEW USING") {
+    withTempPath { path =>
+      try {
+        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
+        sql(s"CREATE GLOBAL TEMP VIEW src USING parquet OPTIONS (PATH '${path.getAbsolutePath}')")
+        checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
+        sql(s"INSERT INTO $globalTempDB.src SELECT 2, 'b'")
+        checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a") :: Row(2, "b") :: Nil)
+      } finally {
+        spark.catalog.dropGlobalTempView("src")
+      }
+    }
+  }
+
+  test("CREATE TABLE LIKE should work for global temp view") {
+    try {
+      sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1 AS a, '2' AS b")
+      sql(s"CREATE TABLE cloned LIKE ${globalTempDB}.src")
+      val tableMeta = spark.sessionState.catalog.getTableMetadata(TableIdentifier("cloned"))
+      assert(tableMeta.schema == new StructType().add("a", "int", false).add("b", "string", false))
+    } finally {
+      spark.catalog.dropGlobalTempView("src")
+      sql("DROP TABLE default.cloned")
+    }
+  }
+
+  test("list global temp views") {
+    try {
+      sql("CREATE GLOBAL TEMP VIEW v1 AS SELECT 3, 4")
+      sql("CREATE TEMP VIEW v2 AS SELECT 1, 2")
+
+      checkAnswer(sql(s"SHOW TABLES IN $globalTempDB"),
+        Row(globalTempDB, "v1", true) ::
+        Row("", "v2", true) :: Nil)
+
+      assert(spark.catalog.listTables(globalTempDB).collect().toSeq.map(_.name) == Seq("v1", "v2"))
+    } finally {
+      spark.catalog.dropTempView("v1")
+      spark.catalog.dropGlobalTempView("v2")
+    }
+  }
+
+  test("should lookup global temp view if and only if global temp db is specified") {
+    try {
+      sql("CREATE GLOBAL TEMP VIEW same_name AS SELECT 3, 4")
+      sql("CREATE TEMP VIEW same_name AS SELECT 1, 2")
+
+      checkAnswer(sql("SELECT * FROM same_name"), Row(1, 2))
+
+      // we never lookup global temp views if database is not specified in table name
+      spark.catalog.dropTempView("same_name")
+      intercept[AnalysisException](sql("SELECT * FROM same_name"))
+
+      // Use qualified name to lookup a global temp view.
+      checkAnswer(sql(s"SELECT * FROM $globalTempDB.same_name"), Row(3, 4))
+    } finally {
+      spark.catalog.dropTempView("same_name")
+      spark.catalog.dropGlobalTempView("same_name")
+    }
+  }
+
+  test("public Catalog should recognize global temp view") {
+    try {
+      sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 2")
+
+      assert(spark.catalog.tableExists(globalTempDB, "src"))
+      assert(spark.catalog.getTable(globalTempDB, "src").toString == new Table(
+        name = "src",
+        database = globalTempDB,
+        description = null,
+        tableType = "TEMPORARY",
+        isTemporary = true).toString)
+    } finally {
+      spark.catalog.dropGlobalTempView("src")
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org