You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/05/27 22:09:37 UTC

spark git commit: [SPARK-7684] [SQL] Refactoring MetastoreDataSourcesSuite to workaround SPARK-7684

Repository: spark
Updated Branches:
  refs/heads/master 8161562ea -> b97ddff00


[SPARK-7684] [SQL] Refactoring MetastoreDataSourcesSuite to workaround SPARK-7684

As stated in SPARK-7684, currently `TestHive.reset` has some execution order specific bug, which makes running specific test suites locally pretty frustrating. This PR refactors `MetastoreDataSourcesSuite` (which relies on `TestHive.reset` heavily) using various `withXxx` utility methods in `SQLTestUtils` to ask each test case to cleanup their own mess so that we can avoid calling `TestHive.reset`.

Author: Cheng Lian <li...@databricks.com>
Author: Yin Huai <yh...@databricks.com>

Closes #6353 from liancheng/workaround-spark-7684 and squashes the following commits:

26939aa [Yin Huai] Move the initialization of jsonFilePath to beforeAll.
a423d48 [Cheng Lian] Fixes Scala style issue
dfe45d0 [Cheng Lian] Refactors MetastoreDataSourcesSuite to workaround SPARK-7684
92a116d [Cheng Lian] Fixes minor styling issues


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b97ddff0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b97ddff0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b97ddff0

Branch: refs/heads/master
Commit: b97ddff000b99adca3dd8fe13d01054fd5014fa0
Parents: 8161562
Author: Cheng Lian <li...@databricks.com>
Authored: Wed May 27 13:09:33 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed May 27 13:09:33 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/QueryTest.scala  |    4 +
 .../apache/spark/sql/test/SQLTestUtils.scala    |   12 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 1372 +++++++++---------
 3 files changed, 722 insertions(+), 666 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b97ddff0/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index bbf9ab1..98ba3c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -67,6 +67,10 @@ class QueryTest extends PlanTest {
     checkAnswer(df, Seq(expectedAnswer))
   }
 
+  protected def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = {
+    checkAnswer(df, expectedAnswer.collect())
+  }
+
   def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext) {
     test(sqlString) {
       checkAnswer(sqlContext.sql(sqlString), expectedAnswer)

http://git-wip-us.apache.org/repos/asf/spark/blob/b97ddff0/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index ca66cdc..17a8b0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -75,14 +75,18 @@ trait SQLTestUtils {
   /**
    * Drops temporary table `tableName` after calling `f`.
    */
-  protected def withTempTable(tableName: String)(f: => Unit): Unit = {
-    try f finally sqlContext.dropTempTable(tableName)
+  protected def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+    try f finally tableNames.foreach(sqlContext.dropTempTable)
   }
 
   /**
    * Drops table `tableName` after calling `f`.
    */
-  protected def withTable(tableName: String)(f: => Unit): Unit = {
-    try f finally sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
+  protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+    try f finally {
+      tableNames.foreach { name =>
+        sqlContext.sql(s"DROP TABLE IF EXISTS $name")
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b97ddff0/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 9623ef0..58e2d1f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -21,770 +21,818 @@ import java.io.File
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.scalatest.BeforeAndAfterAll
+
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.InvalidInputException
-import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
+import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.LogicalRelation
+import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 /**
  * Tests for persisting tables created though the data sources API into the metastore.
  */
-class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
+class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeAndAfterAll {
+  override val sqlContext = TestHive
+
+  var jsonFilePath: String = _
 
-  override def afterEach(): Unit = {
-    reset()
-    Utils.deleteRecursively(tempPath)
+  override def beforeAll(): Unit = {
+    jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
   }
 
-  val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
-  var tempPath: File = Utils.createTempDir()
-  tempPath.delete()
-
-  test ("persistent JSON table") {
-    sql(
-      s"""
-        |CREATE TABLE jsonTable
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path '${filePath}'
-        |)
-      """.stripMargin)
-
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      read.json(filePath).collect().toSeq)
+  test("persistent JSON table") {
+    withTable("jsonTable") {
+      sql(
+        s"""CREATE TABLE jsonTable
+           |USING org.apache.spark.sql.json.DefaultSource
+           |OPTIONS (
+           |  path '$jsonFilePath'
+           |)
+         """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT * FROM jsonTable"),
+        read.json(jsonFilePath).collect().toSeq)
+    }
   }
 
-  test ("persistent JSON table with a user specified schema") {
-    sql(
-      s"""
-        |CREATE TABLE jsonTable (
-        |a string,
-        |b String,
-        |`c_!@(3)` int,
-        |`<d>` Struct<`d!`:array<int>, `=`:array<struct<Dd2: boolean>>>)
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path '${filePath}'
-        |)
-      """.stripMargin)
-
-    read.json(filePath).registerTempTable("expectedJsonTable")
-
-    checkAnswer(
-      sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM jsonTable"),
-      sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM expectedJsonTable").collect().toSeq)
+  test("persistent JSON table with a user specified schema") {
+    withTable("jsonTable") {
+      sql(
+        s"""CREATE TABLE jsonTable (
+           |a string,
+           |b String,
+           |`c_!@(3)` int,
+           |`<d>` Struct<`d!`:array<int>, `=`:array<struct<Dd2: boolean>>>)
+           |USING org.apache.spark.sql.json.DefaultSource
+           |OPTIONS (
+           |  path '$jsonFilePath'
+           |)
+         """.stripMargin)
+
+      withTempTable("expectedJsonTable") {
+        read.json(jsonFilePath).registerTempTable("expectedJsonTable")
+        checkAnswer(
+          sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM jsonTable"),
+          sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM expectedJsonTable"))
+      }
+    }
   }
 
-  test ("persistent JSON table with a user specified schema with a subset of fields") {
-    // This works because JSON objects are self-describing and JSONRelation can get needed
-    // field values based on field names.
-    sql(
-      s"""
-        |CREATE TABLE jsonTable (`<d>` Struct<`=`:array<struct<Dd2: boolean>>>, b String)
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path '${filePath}'
-        |)
-      """.stripMargin)
-
-    val innerStruct = StructType(
-      StructField("=", ArrayType(StructType(StructField("Dd2", BooleanType, true) :: Nil))) :: Nil)
-    val expectedSchema = StructType(
-      StructField("<d>", innerStruct, true) ::
-      StructField("b", StringType, true) :: Nil)
-
-    assert(expectedSchema === table("jsonTable").schema)
-
-    read.json(filePath).registerTempTable("expectedJsonTable")
-
-    checkAnswer(
-      sql("SELECT b, `<d>`.`=` FROM jsonTable"),
-      sql("SELECT b, `<d>`.`=` FROM expectedJsonTable").collect().toSeq)
+  test("persistent JSON table with a user specified schema with a subset of fields") {
+    withTable("jsonTable") {
+      // This works because JSON objects are self-describing and JSONRelation can get needed
+      // field values based on field names.
+      sql(
+        s"""CREATE TABLE jsonTable (`<d>` Struct<`=`:array<struct<Dd2: boolean>>>, b String)
+           |USING org.apache.spark.sql.json.DefaultSource
+           |OPTIONS (
+           |  path '$jsonFilePath'
+           |)
+         """.stripMargin)
+
+      val innerStruct = StructType(Seq(
+        StructField("=", ArrayType(StructType(StructField("Dd2", BooleanType, true) :: Nil)))))
+
+      val expectedSchema = StructType(Seq(
+        StructField("<d>", innerStruct, true),
+        StructField("b", StringType, true)))
+
+      assert(expectedSchema === table("jsonTable").schema)
+
+      withTempTable("expectedJsonTable") {
+        read.json(jsonFilePath).registerTempTable("expectedJsonTable")
+        checkAnswer(
+          sql("SELECT b, `<d>`.`=` FROM jsonTable"),
+          sql("SELECT b, `<d>`.`=` FROM expectedJsonTable"))
+      }
+    }
   }
 
   test("resolve shortened provider names") {
-    sql(
-      s"""
-        |CREATE TABLE jsonTable
-        |USING org.apache.spark.sql.json
-        |OPTIONS (
-        |  path '${filePath}'
-        |)
-      """.stripMargin)
-
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      read.json(filePath).collect().toSeq)
+    withTable("jsonTable") {
+      sql(
+        s"""
+           |CREATE TABLE jsonTable
+           |USING org.apache.spark.sql.json
+           |OPTIONS (
+           |  path '$jsonFilePath'
+           |)
+         """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT * FROM jsonTable"),
+        read.json(jsonFilePath).collect().toSeq)
+    }
   }
 
   test("drop table") {
-    sql(
-      s"""
-        |CREATE TABLE jsonTable
-        |USING org.apache.spark.sql.json
-        |OPTIONS (
-        |  path '${filePath}'
-        |)
-      """.stripMargin)
-
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      read.json(filePath).collect().toSeq)
-
-    sql("DROP TABLE jsonTable")
-
-    intercept[Exception] {
-      sql("SELECT * FROM jsonTable").collect()
-    }
+    withTable("jsonTable") {
+      sql(
+        s"""
+           |CREATE TABLE jsonTable
+           |USING org.apache.spark.sql.json
+           |OPTIONS (
+           |  path '$jsonFilePath'
+           |)
+         """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT * FROM jsonTable"),
+        read.json(jsonFilePath))
+
+      sql("DROP TABLE jsonTable")
 
-    assert(
-      (new File(filePath)).exists(),
-      "The table with specified path is considered as an external table, " +
-        "its data should not deleted after DROP TABLE.")
+      intercept[Exception] {
+        sql("SELECT * FROM jsonTable").collect()
+      }
+
+      assert(
+        new File(jsonFilePath).exists(),
+        "The table with specified path is considered as an external table, " +
+          "its data should not deleted after DROP TABLE.")
+    }
   }
 
   test("check change without refresh") {
-    val tempDir = File.createTempFile("sparksql", "json", Utils.createTempDir())
-    tempDir.delete()
-    sparkContext.parallelize(("a", "b") :: Nil).toDF()
-      .toJSON.saveAsTextFile(tempDir.getCanonicalPath)
-
-    sql(
-      s"""
-        |CREATE TABLE jsonTable
-        |USING org.apache.spark.sql.json
-        |OPTIONS (
-        |  path '${tempDir.getCanonicalPath}'
-        |)
-      """.stripMargin)
-
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      Row("a", "b"))
-
-    Utils.deleteRecursively(tempDir)
-    sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF()
-      .toJSON.saveAsTextFile(tempDir.getCanonicalPath)
-
-    // Schema is cached so the new column does not show. The updated values in existing columns
-    // will show.
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      Row("a1", "b1"))
-
-    sql("REFRESH TABLE jsonTable")
-
-    // Check that the refresh worked
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      Row("a1", "b1", "c1"))
-    Utils.deleteRecursively(tempDir)
+    withTempPath { tempDir =>
+      withTable("jsonTable") {
+        (("a", "b") :: Nil).toDF().toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+        sql(
+          s"""CREATE TABLE jsonTable
+             |USING org.apache.spark.sql.json
+             |OPTIONS (
+             |  path '${tempDir.getCanonicalPath}'
+             |)
+           """.stripMargin)
+
+        checkAnswer(
+          sql("SELECT * FROM jsonTable"),
+          Row("a", "b"))
+
+        Utils.deleteRecursively(tempDir)
+        (("a1", "b1", "c1") :: Nil).toDF().toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+        // Schema is cached so the new column does not show. The updated values in existing columns
+        // will show.
+        checkAnswer(
+          sql("SELECT * FROM jsonTable"),
+          Row("a1", "b1"))
+
+        sql("REFRESH TABLE jsonTable")
+
+        // Check that the refresh worked
+        checkAnswer(
+          sql("SELECT * FROM jsonTable"),
+          Row("a1", "b1", "c1"))
+      }
+    }
   }
 
   test("drop, change, recreate") {
-    val tempDir = File.createTempFile("sparksql", "json", Utils.createTempDir())
-    tempDir.delete()
-    sparkContext.parallelize(("a", "b") :: Nil).toDF()
-      .toJSON.saveAsTextFile(tempDir.getCanonicalPath)
-
-    sql(
-      s"""
-        |CREATE TABLE jsonTable
-        |USING org.apache.spark.sql.json
-        |OPTIONS (
-        |  path '${tempDir.getCanonicalPath}'
-        |)
-      """.stripMargin)
-
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      Row("a", "b"))
-
-    Utils.deleteRecursively(tempDir)
-    sparkContext.parallelize(("a", "b", "c") :: Nil).toDF()
-      .toJSON.saveAsTextFile(tempDir.getCanonicalPath)
-
-    sql("DROP TABLE jsonTable")
-
-    sql(
-      s"""
-        |CREATE TABLE jsonTable
-        |USING org.apache.spark.sql.json
-        |OPTIONS (
-        |  path '${tempDir.getCanonicalPath}'
-        |)
-      """.stripMargin)
-
-    // New table should reflect new schema.
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      Row("a", "b", "c"))
-    Utils.deleteRecursively(tempDir)
+    withTempPath { tempDir =>
+      (("a", "b") :: Nil).toDF().toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+      withTable("jsonTable") {
+        sql(
+          s"""CREATE TABLE jsonTable
+             |USING org.apache.spark.sql.json
+             |OPTIONS (
+             |  path '${tempDir.getCanonicalPath}'
+             |)
+           """.stripMargin)
+
+        checkAnswer(
+          sql("SELECT * FROM jsonTable"),
+          Row("a", "b"))
+
+        Utils.deleteRecursively(tempDir)
+        (("a", "b", "c") :: Nil).toDF().toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+        sql("DROP TABLE jsonTable")
+
+        sql(
+          s"""CREATE TABLE jsonTable
+             |USING org.apache.spark.sql.json
+             |OPTIONS (
+             |  path '${tempDir.getCanonicalPath}'
+             |)
+           """.stripMargin)
+
+        // New table should reflect new schema.
+        checkAnswer(
+          sql("SELECT * FROM jsonTable"),
+          Row("a", "b", "c"))
+      }
+    }
   }
 
   test("invalidate cache and reload") {
-    sql(
-      s"""
-        |CREATE TABLE jsonTable (`c_!@(3)` int)
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path '${filePath}'
-        |)
-      """.stripMargin)
+    withTable("jsonTable") {
+      sql(
+        s"""CREATE TABLE jsonTable (`c_!@(3)` int)
+           |USING org.apache.spark.sql.json.DefaultSource
+           |OPTIONS (
+           |  path '$jsonFilePath'
+           |)
+         """.stripMargin)
 
-    read.json(filePath).registerTempTable("expectedJsonTable")
+      withTempTable("expectedJsonTable") {
+        read.json(jsonFilePath).registerTempTable("expectedJsonTable")
 
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
+        checkAnswer(
+          sql("SELECT * FROM jsonTable"),
+          sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
 
-    // Discard the cached relation.
-    invalidateTable("jsonTable")
+        // Discard the cached relation.
+        invalidateTable("jsonTable")
 
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
+        checkAnswer(
+          sql("SELECT * FROM jsonTable"),
+          sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
 
-    invalidateTable("jsonTable")
-    val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil)
+        invalidateTable("jsonTable")
+        val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil)
 
-    assert(expectedSchema === table("jsonTable").schema)
+        assert(expectedSchema === table("jsonTable").schema)
+      }
+    }
   }
 
   test("CTAS") {
-    sql(
-      s"""
-        |CREATE TABLE jsonTable
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path '${filePath}'
-        |)
-      """.stripMargin)
-
-    sql(
-      s"""
-        |CREATE TABLE ctasJsonTable
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path '${tempPath}'
-        |) AS
-        |SELECT * FROM jsonTable
-      """.stripMargin)
-
-    assert(table("ctasJsonTable").schema === table("jsonTable").schema)
-
-    checkAnswer(
-      sql("SELECT * FROM ctasJsonTable"),
-      sql("SELECT * FROM jsonTable").collect())
+    withTempPath { tempPath =>
+      withTable("jsonTable", "ctasJsonTable") {
+        sql(
+          s"""CREATE TABLE jsonTable
+             |USING org.apache.spark.sql.json.DefaultSource
+             |OPTIONS (
+             |  path '$jsonFilePath'
+             |)
+           """.stripMargin)
+
+        sql(
+          s"""CREATE TABLE ctasJsonTable
+             |USING org.apache.spark.sql.json.DefaultSource
+             |OPTIONS (
+             |  path '$tempPath'
+             |) AS
+             |SELECT * FROM jsonTable
+           """.stripMargin)
+
+        assert(table("ctasJsonTable").schema === table("jsonTable").schema)
+
+        checkAnswer(
+          sql("SELECT * FROM ctasJsonTable"),
+          sql("SELECT * FROM jsonTable").collect())
+      }
+    }
   }
 
   test("CTAS with IF NOT EXISTS") {
-    sql(
-      s"""
-        |CREATE TABLE jsonTable
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path '${filePath}'
-        |)
-      """.stripMargin)
-
-    sql(
-      s"""
-        |CREATE TABLE ctasJsonTable
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path '${tempPath}'
-        |) AS
-        |SELECT * FROM jsonTable
-      """.stripMargin)
-
-    // Create the table again should trigger a AnalysisException.
-    val message = intercept[AnalysisException] {
-      sql(
-        s"""
-        |CREATE TABLE ctasJsonTable
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path '${tempPath}'
-        |) AS
-        |SELECT * FROM jsonTable
-      """.stripMargin)
-    }.getMessage
-    assert(message.contains("Table ctasJsonTable already exists."),
-      "We should complain that ctasJsonTable already exists")
-
-    // The following statement should be fine if it has IF NOT EXISTS.
-    // It tries to create a table ctasJsonTable with a new schema.
-    // The actual table's schema and data should not be changed.
-    sql(
-      s"""
-        |CREATE TABLE IF NOT EXISTS ctasJsonTable
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path '${tempPath}'
-        |) AS
-        |SELECT a FROM jsonTable
-      """.stripMargin)
-
-    // Discard the cached relation.
-    invalidateTable("ctasJsonTable")
-
-    // Schema should not be changed.
-    assert(table("ctasJsonTable").schema === table("jsonTable").schema)
-    // Table data should not be changed.
-    checkAnswer(
-      sql("SELECT * FROM ctasJsonTable"),
-      sql("SELECT * FROM jsonTable").collect())
+    withTempPath { path =>
+      val tempPath = path.getCanonicalPath
+
+      withTable("jsonTable", "ctasJsonTable") {
+        sql(
+          s"""CREATE TABLE jsonTable
+             |USING org.apache.spark.sql.json.DefaultSource
+             |OPTIONS (
+             |  path '$jsonFilePath'
+             |)
+           """.stripMargin)
+
+        sql(
+          s"""CREATE TABLE ctasJsonTable
+             |USING org.apache.spark.sql.json.DefaultSource
+             |OPTIONS (
+             |  path '$tempPath'
+             |) AS
+             |SELECT * FROM jsonTable
+           """.stripMargin)
+
+        // Create the table again should trigger a AnalysisException.
+        val message = intercept[AnalysisException] {
+          sql(
+            s"""CREATE TABLE ctasJsonTable
+               |USING org.apache.spark.sql.json.DefaultSource
+               |OPTIONS (
+               |  path '$tempPath'
+               |) AS
+               |SELECT * FROM jsonTable
+             """.stripMargin)
+        }.getMessage
+
+        assert(
+          message.contains("Table ctasJsonTable already exists."),
+          "We should complain that ctasJsonTable already exists")
+
+        // The following statement should be fine if it has IF NOT EXISTS.
+        // It tries to create a table ctasJsonTable with a new schema.
+        // The actual table's schema and data should not be changed.
+        sql(
+          s"""CREATE TABLE IF NOT EXISTS ctasJsonTable
+             |USING org.apache.spark.sql.json.DefaultSource
+             |OPTIONS (
+             |  path '$tempPath'
+             |) AS
+             |SELECT a FROM jsonTable
+           """.stripMargin)
+
+        // Discard the cached relation.
+        invalidateTable("ctasJsonTable")
+
+        // Schema should not be changed.
+        assert(table("ctasJsonTable").schema === table("jsonTable").schema)
+        // Table data should not be changed.
+        checkAnswer(
+          sql("SELECT * FROM ctasJsonTable"),
+          sql("SELECT * FROM jsonTable").collect())
+      }
+    }
   }
 
   test("CTAS a managed table") {
-    sql(
-      s"""
-        |CREATE TABLE jsonTable
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path '${filePath}'
-        |)
-      """.stripMargin)
-
-    val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable")
-    val filesystemPath = new Path(expectedPath)
-    val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
-    if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
-
-    // It is a managed table when we do not specify the location.
-    sql(
-      s"""
-        |CREATE TABLE ctasJsonTable
-        |USING org.apache.spark.sql.json.DefaultSource
-        |AS
-        |SELECT * FROM jsonTable
-      """.stripMargin)
-
-    assert(fs.exists(filesystemPath), s"$expectedPath should exist after we create the table.")
-
-    sql(
-      s"""
-        |CREATE TABLE loadedTable
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path '${expectedPath}'
-        |)
-      """.stripMargin)
-
-    assert(table("ctasJsonTable").schema === table("loadedTable").schema)
-
-    checkAnswer(
-      sql("SELECT * FROM ctasJsonTable"),
-      sql("SELECT * FROM loadedTable").collect()
-    )
-
-    sql("DROP TABLE ctasJsonTable")
-    assert(!fs.exists(filesystemPath), s"$expectedPath should not exist after we drop the table.")
-  }
-
-  test("SPARK-5286 Fail to drop an invalid table when using the data source API") {
-    sql(
-      s"""
-        |CREATE TABLE jsonTable
-        |USING org.apache.spark.sql.json.DefaultSource
-        |OPTIONS (
-        |  path 'it is not a path at all!'
-        |)
-      """.stripMargin)
-
-    sql("DROP TABLE jsonTable").collect().foreach(println)
-  }
-
-  test("SPARK-5839 HiveMetastoreCatalog does not recognize table aliases of data source tables.") {
-    val originalDefaultSource = conf.defaultDataSourceName
-
-    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    val df = read.json(rdd)
-
-    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
-    // Save the df as a managed table (by not specifiying the path).
-    df.write.saveAsTable("savedJsonTable")
-
-    checkAnswer(
-      sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
-      (1 to 4).map(i => Row(i, s"str${i}")))
+    withTable("jsonTable", "ctasJsonTable", "loadedTable") {
+      sql(
+        s"""CREATE TABLE jsonTable
+           |USING org.apache.spark.sql.json.DefaultSource
+           |OPTIONS (
+           |  path '$jsonFilePath'
+           |)
+         """.stripMargin)
+
+      val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable")
+      val filesystemPath = new Path(expectedPath)
+      val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
+      if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
+
+      // It is a managed table when we do not specify the location.
+      sql(
+        s"""CREATE TABLE ctasJsonTable
+           |USING org.apache.spark.sql.json.DefaultSource
+           |AS
+           |SELECT * FROM jsonTable
+         """.stripMargin)
 
-    checkAnswer(
-      sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
-      (6 to 10).map(i => Row(i, s"str${i}")))
+      assert(fs.exists(filesystemPath), s"$expectedPath should exist after we create the table.")
 
-    invalidateTable("savedJsonTable")
+      sql(
+        s"""CREATE TABLE loadedTable
+           |USING org.apache.spark.sql.json.DefaultSource
+           |OPTIONS (
+           |  path '$expectedPath'
+           |)
+         """.stripMargin)
 
-    checkAnswer(
-      sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
-      (1 to 4).map(i => Row(i, s"str${i}")))
+      assert(table("ctasJsonTable").schema === table("loadedTable").schema)
 
-    checkAnswer(
-      sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
-      (6 to 10).map(i => Row(i, s"str${i}")))
+      checkAnswer(
+        sql("SELECT * FROM ctasJsonTable"),
+        sql("SELECT * FROM loadedTable"))
 
-    // Drop table will also delete the data.
-    sql("DROP TABLE savedJsonTable")
+      sql("DROP TABLE ctasJsonTable")
+      assert(!fs.exists(filesystemPath), s"$expectedPath should not exist after we drop the table.")
+    }
+  }
 
-    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
+  test("SPARK-5286 Fail to drop an invalid table when using the data source API") {
+    withTable("jsonTable") {
+      sql(
+        s"""CREATE TABLE jsonTable
+           |USING org.apache.spark.sql.json.DefaultSource
+           |OPTIONS (
+           |  path 'it is not a path at all!'
+           |)
+         """.stripMargin)
+
+      sql("DROP TABLE jsonTable").collect().foreach(println)
+    }
   }
 
-  test("save table") {
-    val originalDefaultSource = conf.defaultDataSourceName
+  test("SPARK-5839 HiveMetastoreCatalog does not recognize table aliases of data source tables.") {
+    withTable("savedJsonTable") {
+      // Save the df as a managed table (by not specifying the path).
+      (1 to 10)
+        .map(i => i -> s"str$i")
+        .toDF("a", "b")
+        .write
+        .format("json")
+        .saveAsTable("savedJsonTable")
 
-    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    val df = read.json(rdd)
+      checkAnswer(
+        sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
+        (1 to 4).map(i => Row(i, s"str$i")))
 
-    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
-    // Save the df as a managed table (by not specifiying the path).
-    df.write.saveAsTable("savedJsonTable")
+      checkAnswer(
+        sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
+        (6 to 10).map(i => Row(i, s"str$i")))
 
-    checkAnswer(
-      sql("SELECT * FROM savedJsonTable"),
-      df.collect())
+      invalidateTable("savedJsonTable")
 
-    // Right now, we cannot append to an existing JSON table.
-    intercept[RuntimeException] {
-      df.write.mode(SaveMode.Append).saveAsTable("savedJsonTable")
-    }
+      checkAnswer(
+        sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
+        (1 to 4).map(i => Row(i, s"str$i")))
 
-    // We can overwrite it.
-    df.write.mode(SaveMode.Overwrite).saveAsTable("savedJsonTable")
-    checkAnswer(
-      sql("SELECT * FROM savedJsonTable"),
-      df.collect())
-
-    // When the save mode is Ignore, we will do nothing when the table already exists.
-    df.select("b").write.mode(SaveMode.Ignore).saveAsTable("savedJsonTable")
-    assert(df.schema === table("savedJsonTable").schema)
-    checkAnswer(
-      sql("SELECT * FROM savedJsonTable"),
-      df.collect())
-
-    // Drop table will also delete the data.
-    sql("DROP TABLE savedJsonTable")
-    intercept[InvalidInputException] {
-      read.json(catalog.hiveDefaultTableFilePath("savedJsonTable"))
+      checkAnswer(
+        sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
+        (6 to 10).map(i => Row(i, s"str$i")))
     }
+  }
 
-    // Create an external table by specifying the path.
-    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
-    df.write
-      .format("org.apache.spark.sql.json")
-      .mode(SaveMode.Append)
-      .option("path", tempPath.toString)
-      .saveAsTable("savedJsonTable")
-    checkAnswer(
-      sql("SELECT * FROM savedJsonTable"),
-      df.collect())
-
-    // Data should not be deleted after we drop the table.
-    sql("DROP TABLE savedJsonTable")
-    checkAnswer(
-      read.json(tempPath.toString),
-      df.collect())
-
-    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
+  test("save table") {
+    withTempPath { path =>
+      val tempPath = path.getCanonicalPath
+
+      withTable("savedJsonTable") {
+        val df = (1 to 10).map(i => i -> s"str$i").toDF("a", "b")
+
+        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "json") {
+          // Save the df as a managed table (by not specifying the path).
+          df.write.saveAsTable("savedJsonTable")
+
+          checkAnswer(sql("SELECT * FROM savedJsonTable"), df)
+
+          // Right now, we cannot append to an existing JSON table.
+          intercept[RuntimeException] {
+            df.write.mode(SaveMode.Append).saveAsTable("savedJsonTable")
+          }
+
+          // We can overwrite it.
+          df.write.mode(SaveMode.Overwrite).saveAsTable("savedJsonTable")
+          checkAnswer(sql("SELECT * FROM savedJsonTable"), df)
+
+          // When the save mode is Ignore, we will do nothing when the table already exists.
+          df.select("b").write.mode(SaveMode.Ignore).saveAsTable("savedJsonTable")
+          assert(df.schema === table("savedJsonTable").schema)
+          checkAnswer(sql("SELECT * FROM savedJsonTable"), df)
+
+          // Drop table will also delete the data.
+          sql("DROP TABLE savedJsonTable")
+          intercept[InvalidInputException] {
+            read.json(catalog.hiveDefaultTableFilePath("savedJsonTable"))
+          }
+        }
+
+        // Create an external table by specifying the path.
+        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "not a source name") {
+          df.write
+            .format("org.apache.spark.sql.json")
+            .mode(SaveMode.Append)
+            .option("path", tempPath.toString)
+            .saveAsTable("savedJsonTable")
+
+          checkAnswer(sql("SELECT * FROM savedJsonTable"), df)
+        }
+
+        // Data should not be deleted after we drop the table.
+        sql("DROP TABLE savedJsonTable")
+        checkAnswer(read.json(tempPath.toString), df)
+      }
+    }
   }
 
   test("create external table") {
-    val originalDefaultSource = conf.defaultDataSourceName
-
-    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    val df = read.json(rdd)
-
-    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
-    df.write.format("org.apache.spark.sql.json")
-      .mode(SaveMode.Append)
-      .option("path", tempPath.toString)
-      .saveAsTable("savedJsonTable")
-
-    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
-    createExternalTable("createdJsonTable", tempPath.toString)
-    assert(table("createdJsonTable").schema === df.schema)
-    checkAnswer(
-      sql("SELECT * FROM createdJsonTable"),
-      df.collect())
-
-    var message = intercept[AnalysisException] {
-      createExternalTable("createdJsonTable", filePath.toString)
-    }.getMessage
-    assert(message.contains("Table createdJsonTable already exists."),
-      "We should complain that ctasJsonTable already exists")
-
-    // Data should not be deleted.
-    sql("DROP TABLE createdJsonTable")
-    checkAnswer(
-      read.json(tempPath.toString),
-      df.collect())
-
-    // Try to specify the schema.
-    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
-    val schema = StructType(StructField("b", StringType, true) :: Nil)
-    createExternalTable(
-      "createdJsonTable",
-      "org.apache.spark.sql.json",
-      schema,
-      Map("path" -> tempPath.toString))
-    checkAnswer(
-      sql("SELECT * FROM createdJsonTable"),
-      sql("SELECT b FROM savedJsonTable").collect())
-
-    sql("DROP TABLE createdJsonTable")
-
-    message = intercept[RuntimeException] {
-      createExternalTable(
-        "createdJsonTable",
-        "org.apache.spark.sql.json",
-        schema,
-        Map.empty[String, String])
-    }.getMessage
-    assert(
-      message.contains("'path' must be specified for json data."),
-      "We should complain that path is not specified.")
-
-    sql("DROP TABLE savedJsonTable")
-    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
+    withTempPath { tempPath =>
+      withTable("savedJsonTable", "createdJsonTable") {
+        val df = read.json(sparkContext.parallelize((1 to 10).map { i =>
+          s"""{ "a": $i, "b": "str$i" }"""
+        }))
+
+        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "not a source name") {
+          df.write
+            .format("json")
+            .mode(SaveMode.Append)
+            .option("path", tempPath.toString)
+            .saveAsTable("savedJsonTable")
+        }
+
+        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "json") {
+          createExternalTable("createdJsonTable", tempPath.toString)
+          assert(table("createdJsonTable").schema === df.schema)
+          checkAnswer(sql("SELECT * FROM createdJsonTable"), df)
+
+          assert(
+            intercept[AnalysisException] {
+              createExternalTable("createdJsonTable", jsonFilePath.toString)
+            }.getMessage.contains("Table createdJsonTable already exists."),
+            "We should complain that createdJsonTable already exists")
+        }
+
+        // Data should not be deleted.
+        sql("DROP TABLE createdJsonTable")
+        checkAnswer(read.json(tempPath.toString), df)
+
+        // Try to specify the schema.
+        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "not a source name") {
+          val schema = StructType(StructField("b", StringType, true) :: Nil)
+          createExternalTable(
+            "createdJsonTable",
+            "org.apache.spark.sql.json",
+            schema,
+            Map("path" -> tempPath.toString))
+
+          checkAnswer(
+            sql("SELECT * FROM createdJsonTable"),
+            sql("SELECT b FROM savedJsonTable"))
+
+          sql("DROP TABLE createdJsonTable")
+
+          assert(
+            intercept[RuntimeException] {
+              createExternalTable(
+                "createdJsonTable",
+                "org.apache.spark.sql.json",
+                schema,
+                Map.empty[String, String])
+            }.getMessage.contains("'path' must be specified for json data."),
+            "We should complain that path is not specified.")
+        }
+      }
+    }
   }
 
   if (HiveShim.version == "0.13.1") {
     test("scan a parquet table created through a CTAS statement") {
-      val originalConvertMetastore = getConf("spark.sql.hive.convertMetastoreParquet", "true")
-      val originalUseDataSource = getConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
-      setConf("spark.sql.hive.convertMetastoreParquet", "true")
-      setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
-
-      val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-      read.json(rdd).registerTempTable("jt")
-      sql(
-        """
-          |create table test_parquet_ctas STORED AS parquET
-          |AS select tmp.a from jt tmp where tmp.a < 5
-        """.stripMargin)
-
-      checkAnswer(
-        sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "),
-        Row(3) :: Row(4) :: Nil
-      )
-
-      table("test_parquet_ctas").queryExecution.optimizedPlan match {
-        case LogicalRelation(p: ParquetRelation2) => // OK
-        case _ =>
-          fail(
-            "test_parquet_ctas should be converted to " +
-            s"${classOf[ParquetRelation2].getCanonicalName}")
+      withSQLConf(
+        "spark.sql.hive.convertMetastoreParquet" -> "true",
+        SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
+
+        withTempTable("jt") {
+          (1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt")
+
+          withTable("test_parquet_ctas") {
+            sql(
+              """CREATE TABLE test_parquet_ctas STORED AS PARQUET
+                |AS SELECT tmp.a FROM jt tmp WHERE tmp.a < 5
+              """.stripMargin)
+
+            checkAnswer(
+              sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "),
+              Row(3) :: Row(4) :: Nil)
+
+            table("test_parquet_ctas").queryExecution.optimizedPlan match {
+              case LogicalRelation(p: ParquetRelation2) => // OK
+              case _ =>
+                fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation2]}")
+            }
+          }
+        }
       }
-
-      // Clenup and reset confs.
-      sql("DROP TABLE IF EXISTS jt")
-      sql("DROP TABLE IF EXISTS test_parquet_ctas")
-      setConf("spark.sql.hive.convertMetastoreParquet", originalConvertMetastore)
-      setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource)
     }
   }
 
   test("Pre insert nullability check (ArrayType)") {
-    val df1 =
-      createDataFrame(Tuple1(Seq(Int.box(1), null.asInstanceOf[Integer])) :: Nil).toDF("a")
-    val expectedSchema1 =
-      StructType(
-        StructField("a", ArrayType(IntegerType, containsNull = true), nullable = true) :: Nil)
-    assert(df1.schema === expectedSchema1)
-    df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("arrayInParquet")
-
-    val df2 =
-      createDataFrame(Tuple1(Seq(2, 3)) :: Nil).toDF("a")
-    val expectedSchema2 =
-      StructType(
-        StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil)
-    assert(df2.schema === expectedSchema2)
-    df2.write.mode(SaveMode.Append).insertInto("arrayInParquet")
-    createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
-      .saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
-    createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write
-      .mode(SaveMode.Append).saveAsTable("arrayInParquet")
-    refreshTable("arrayInParquet")
-
-    checkAnswer(
-      sql("SELECT a FROM arrayInParquet"),
-      Row(ArrayBuffer(1, null)) ::
-        Row(ArrayBuffer(2, 3)) ::
-        Row(ArrayBuffer(4, 5)) ::
-        Row(ArrayBuffer(6, null)) :: Nil)
-
-    sql("DROP TABLE arrayInParquet")
+    withTable("arrayInParquet") {
+      {
+        val df = (Tuple1(Seq(Int.box(1), null: Integer)) :: Nil).toDF("a")
+        val expectedSchema =
+          StructType(
+            StructField(
+              "a",
+              ArrayType(IntegerType, containsNull = true),
+              nullable = true) :: Nil)
+
+        assert(df.schema === expectedSchema)
+
+        df.write
+          .format("parquet")
+          .mode(SaveMode.Overwrite)
+          .saveAsTable("arrayInParquet")
+      }
+
+      {
+        val df = (Tuple1(Seq(2, 3)) :: Nil).toDF("a")
+        val expectedSchema =
+          StructType(
+            StructField(
+              "a",
+              ArrayType(IntegerType, containsNull = false),
+              nullable = true) :: Nil)
+
+        assert(df.schema === expectedSchema)
+
+        df.write
+          .format("parquet")
+          .mode(SaveMode.Append)
+          .insertInto("arrayInParquet")
+      }
+
+      (Tuple1(Seq(4, 5)) :: Nil).toDF("a")
+        .write
+        .mode(SaveMode.Append)
+        .saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
+
+      (Tuple1(Seq(Int.box(6), null: Integer)) :: Nil).toDF("a")
+        .write
+        .mode(SaveMode.Append)
+        .saveAsTable("arrayInParquet")
+
+      refreshTable("arrayInParquet")
+
+      checkAnswer(
+        sql("SELECT a FROM arrayInParquet"),
+        Row(ArrayBuffer(1, null)) ::
+          Row(ArrayBuffer(2, 3)) ::
+          Row(ArrayBuffer(4, 5)) ::
+          Row(ArrayBuffer(6, null)) :: Nil)
+    }
   }
 
   test("Pre insert nullability check (MapType)") {
-    val df1 =
-      createDataFrame(Tuple1(Map(1 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
-    val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = true)
-    val expectedSchema1 =
-      StructType(
-        StructField("a", mapType1, nullable = true) :: Nil)
-    assert(df1.schema === expectedSchema1)
-    df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("mapInParquet")
-
-    val df2 =
-      createDataFrame(Tuple1(Map(2 -> 3)) :: Nil).toDF("a")
-    val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = false)
-    val expectedSchema2 =
-      StructType(
-        StructField("a", mapType2, nullable = true) :: Nil)
-    assert(df2.schema === expectedSchema2)
-    df2.write.mode(SaveMode.Append).insertInto("mapInParquet")
-    createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
-      .saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
-    createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write
-      .format("parquet").mode(SaveMode.Append).saveAsTable("mapInParquet")
-    refreshTable("mapInParquet")
-
-    checkAnswer(
-      sql("SELECT a FROM mapInParquet"),
-      Row(Map(1 -> null)) ::
-        Row(Map(2 -> 3)) ::
-        Row(Map(4 -> 5)) ::
-        Row(Map(6 -> null)) :: Nil)
-
-    sql("DROP TABLE mapInParquet")
+    withTable("mapInParquet") {
+      {
+        val df = (Tuple1(Map(1 -> (null: Integer))) :: Nil).toDF("a")
+        val expectedSchema =
+          StructType(
+            StructField(
+              "a",
+              MapType(IntegerType, IntegerType, valueContainsNull = true),
+              nullable = true) :: Nil)
+
+        assert(df.schema === expectedSchema)
+
+        df.write
+          .format("parquet")
+          .mode(SaveMode.Overwrite)
+          .saveAsTable("mapInParquet")
+      }
+
+      {
+        val df = (Tuple1(Map(2 -> 3)) :: Nil).toDF("a")
+        val expectedSchema =
+          StructType(
+            StructField(
+              "a",
+              MapType(IntegerType, IntegerType, valueContainsNull = false),
+              nullable = true) :: Nil)
+
+        assert(df.schema === expectedSchema)
+
+        df.write
+          .format("parquet")
+          .mode(SaveMode.Append)
+          .insertInto("mapInParquet")
+      }
+
+      (Tuple1(Map(4 -> 5)) :: Nil).toDF("a")
+        .write
+        .format("parquet")
+        .mode(SaveMode.Append)
+        .saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
+
+      (Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
+        .write
+        .format("parquet")
+        .mode(SaveMode.Append)
+        .saveAsTable("mapInParquet")
+
+      refreshTable("mapInParquet")
+
+      checkAnswer(
+        sql("SELECT a FROM mapInParquet"),
+        Row(Map(1 -> null)) ::
+          Row(Map(2 -> 3)) ::
+          Row(Map(4 -> 5)) ::
+          Row(Map(6 -> null)) :: Nil)
+    }
   }
 
   test("SPARK-6024 wide schema support") {
-    // We will need 80 splits for this schema if the threshold is 4000.
-    val schema = StructType((1 to 5000).map(i => StructField(s"c_${i}", StringType, true)))
-    assert(
-      schema.json.size > conf.schemaStringLengthThreshold,
-      "To correctly test the fix of SPARK-6024, the value of " +
-      s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.size}")
-    // Manually create a metastore data source table.
-    catalog.createDataSourceTable(
-      tableName = "wide_schema",
-      userSpecifiedSchema = Some(schema),
-      partitionColumns = Array.empty[String],
-      provider = "json",
-      options = Map("path" -> "just a dummy path"),
-      isExternal = false)
-
-    invalidateTable("wide_schema")
-
-    val actualSchema = table("wide_schema").schema
-    assert(schema === actualSchema)
+    withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD -> "4000") {
+      withTable("wide_schema") {
+        // We will need 80 splits for this schema if the threshold is 4000.
+        val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))
+
+        // Manually create a metastore data source table.
+        catalog.createDataSourceTable(
+          tableName = "wide_schema",
+          userSpecifiedSchema = Some(schema),
+          partitionColumns = Array.empty[String],
+          provider = "json",
+          options = Map("path" -> "just a dummy path"),
+          isExternal = false)
+
+        invalidateTable("wide_schema")
+
+        val actualSchema = table("wide_schema").schema
+        assert(schema === actualSchema)
+      }
+    }
   }
 
   test("SPARK-6655 still support a schema stored in spark.sql.sources.schema") {
     val tableName = "spark6655"
-    val schema = StructType(StructField("int", IntegerType, true) :: Nil)
-
-    val hiveTable = HiveTable(
-      specifiedDatabase = Some("default"),
-      name = tableName,
-      schema = Seq.empty,
-      partitionColumns = Seq.empty,
-      properties = Map(
-        "spark.sql.sources.provider" -> "json",
-        "spark.sql.sources.schema" -> schema.json,
-        "EXTERNAL" -> "FALSE"),
-      tableType = ManagedTable,
-      serdeProperties = Map(
-        "path" -> catalog.hiveDefaultTableFilePath(tableName)))
-
-    catalog.client.createTable(hiveTable)
-
-    invalidateTable(tableName)
-    val actualSchema = table(tableName).schema
-    assert(schema === actualSchema)
-    sql(s"drop table $tableName")
+    withTable(tableName) {
+      val schema = StructType(StructField("int", IntegerType, true) :: Nil)
+      val hiveTable = HiveTable(
+        specifiedDatabase = Some("default"),
+        name = tableName,
+        schema = Seq.empty,
+        partitionColumns = Seq.empty,
+        properties = Map(
+          "spark.sql.sources.provider" -> "json",
+          "spark.sql.sources.schema" -> schema.json,
+          "EXTERNAL" -> "FALSE"),
+        tableType = ManagedTable,
+        serdeProperties = Map(
+          "path" -> catalog.hiveDefaultTableFilePath(tableName)))
+
+      catalog.client.createTable(hiveTable)
+
+      invalidateTable(tableName)
+      val actualSchema = table(tableName).schema
+      assert(schema === actualSchema)
+    }
   }
 
   test("Saving partition columns information") {
-    val df =
-      sparkContext.parallelize(1 to 10, 4).map { i =>
-        Tuple4(i, i + 1, s"str$i", s"str${i + 1}")
-      }.toDF("a", "b", "c", "d")
-
+    val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d")
     val tableName = s"partitionInfo_${System.currentTimeMillis()}"
-    df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
-    invalidateTable(tableName)
-    val metastoreTable = catalog.client.getTable("default", tableName)
-    val expectedPartitionColumns =
-      StructType(df.schema("d") :: df.schema("b") :: Nil)
-    val actualPartitionColumns =
-      StructType(
-        metastoreTable.partitionColumns.map(c =>
-          StructField(c.name, HiveMetastoreTypes.toDataType(c.hiveType))))
-    // Make sure partition columns are correctly stored in metastore.
-    assert(
-      expectedPartitionColumns.sameType(actualPartitionColumns),
-      s"Partitions columns stored in metastore $actualPartitionColumns is not the " +
-        s"partition columns defined by the saveAsTable operation $expectedPartitionColumns.")
-
-    // Check the content of the saved table.
-    checkAnswer(
-      table(tableName).selectExpr("c", "b", "d", "a"),
-      df.selectExpr("c", "b", "d", "a").collect())
-
-    sql(s"drop table $tableName")
+
+    withTable(tableName) {
+      df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
+      invalidateTable(tableName)
+      val metastoreTable = catalog.client.getTable("default", tableName)
+      val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
+      val actualPartitionColumns =
+        StructType(
+          metastoreTable.partitionColumns.map(c =>
+            StructField(c.name, HiveMetastoreTypes.toDataType(c.hiveType))))
+      // Make sure partition columns are correctly stored in metastore.
+      assert(
+        expectedPartitionColumns.sameType(actualPartitionColumns),
+        s"Partitions columns stored in metastore $actualPartitionColumns is not the " +
+          s"partition columns defined by the saveAsTable operation $expectedPartitionColumns.")
+
+      // Check the content of the saved table.
+      checkAnswer(
+        table(tableName).select("c", "b", "d", "a"),
+        df.select("c", "b", "d", "a"))
+    }
   }
 
   test("insert into a table") {
-    def createDF(from: Int, to: Int): DataFrame =
-      createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2")
+    def createDF(from: Int, to: Int): DataFrame = {
+      (from to to).map(i => i -> s"str$i").toDF("c1", "c2")
+    }
 
-    createDF(0, 9).write.format("parquet").saveAsTable("insertParquet")
-    checkAnswer(
-      sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
-      (6 to 9).map(i => Row(i, s"str$i")))
+    withTable("insertParquet") {
+      createDF(0, 9).write.format("parquet").saveAsTable("insertParquet")
+      checkAnswer(
+        sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
+        (6 to 9).map(i => Row(i, s"str$i")))
 
-    intercept[AnalysisException] {
-      createDF(10, 19).write.format("parquet").saveAsTable("insertParquet")
-    }
+      intercept[AnalysisException] {
+        createDF(10, 19).write.format("parquet").saveAsTable("insertParquet")
+      }
 
-    createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
-    checkAnswer(
-      sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
-      (6 to 19).map(i => Row(i, s"str$i")))
+      createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
+      checkAnswer(
+        sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
+        (6 to 19).map(i => Row(i, s"str$i")))
 
-    createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
-    checkAnswer(
-      sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"),
-      (6 to 24).map(i => Row(i, s"str$i")))
+      createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
+      checkAnswer(
+        sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"),
+        (6 to 24).map(i => Row(i, s"str$i")))
 
-    intercept[AnalysisException] {
-      createDF(30, 39).write.saveAsTable("insertParquet")
-    }
+      intercept[AnalysisException] {
+        createDF(30, 39).write.saveAsTable("insertParquet")
+      }
+
+      createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet")
+      checkAnswer(
+        sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
+        (6 to 34).map(i => Row(i, s"str$i")))
 
-    createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet")
-    checkAnswer(
-      sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
-      (6 to 34).map(i => Row(i, s"str$i")))
-
-    createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet")
-    checkAnswer(
-      sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
-      (6 to 44).map(i => Row(i, s"str$i")))
-
-    createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet")
-    checkAnswer(
-      sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"),
-      (52 to 54).map(i => Row(i, s"str$i")))
-    createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet")
-    checkAnswer(
-      sql("SELECT p.c1, c2 FROM insertParquet p"),
-      (50 to 59).map(i => Row(i, s"str$i")))
-
-    createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet")
-    checkAnswer(
-      sql("SELECT p.c1, c2 FROM insertParquet p"),
-      (70 to 79).map(i => Row(i, s"str$i")))
+      createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet")
+      checkAnswer(
+        sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
+        (6 to 44).map(i => Row(i, s"str$i")))
+
+      createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet")
+      checkAnswer(
+        sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"),
+        (52 to 54).map(i => Row(i, s"str$i")))
+      createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet")
+      checkAnswer(
+        sql("SELECT p.c1, c2 FROM insertParquet p"),
+        (50 to 59).map(i => Row(i, s"str$i")))
+
+      createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet")
+      checkAnswer(
+        sql("SELECT p.c1, c2 FROM insertParquet p"),
+        (70 to 79).map(i => Row(i, s"str$i")))
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org