You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/08 06:21:53 UTC

spark git commit: [SPARK-21936][SQL] backward compatibility test framework for HiveExternalCatalog

Repository: spark
Updated Branches:
  refs/heads/master 6e37524a1 -> dbb824125


[SPARK-21936][SQL] backward compatibility test framework for HiveExternalCatalog

## What changes were proposed in this pull request?

`HiveExternalCatalog` is a semi-public interface. When creating tables, `HiveExternalCatalog` converts the table metadata to hive table format and save into hive metastore. It's very import to guarantee backward compatibility here, i.e., tables created by previous Spark versions should still be readable in newer Spark versions.

Previously we find backward compatibility issues manually, which is really easy to miss bugs. This PR introduces a test framework to automatically test `HiveExternalCatalog` backward compatibility, by downloading Spark binaries with different versions, and create tables with these Spark versions, and read these tables with current Spark version.

## How was this patch tested?

test-only change

Author: Wenchen Fan <we...@databricks.com>

Closes #19148 from cloud-fan/test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbb82412
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbb82412
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbb82412

Branch: refs/heads/master
Commit: dbb824125d4d31166d9a47c330f8d51f5d159515
Parents: 6e37524
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Sep 7 23:21:49 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Sep 7 23:21:49 2017 -0700

----------------------------------------------------------------------
 sql/hive/pom.xml                                |   4 +
 ...ernalCatalogBackwardCompatibilitySuite.scala | 260 -------------------
 .../hive/HiveExternalCatalogVersionsSuite.scala | 194 ++++++++++++++
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |  77 +-----
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  27 --
 .../spark/sql/hive/SparkSubmitTestUtils.scala   | 101 +++++++
 6 files changed, 301 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dbb82412/sql/hive/pom.xml
----------------------------------------------------------------------
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index a649daf..66fad85 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -177,6 +177,10 @@
       <artifactId>libfb303</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-compiler</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/spark/blob/dbb82412/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
deleted file mode 100644
index 3bd3d0d..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.net.URI
-
-import org.apache.hadoop.fs.Path
-import org.scalatest.BeforeAndAfterEach
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.Utils
-
-
-class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
-  with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
-
-  val tempDir = Utils.createTempDir().getCanonicalFile
-  val tempDirUri = tempDir.toURI
-  val tempDirStr = tempDir.getAbsolutePath
-
-  override def beforeEach(): Unit = {
-    sql("CREATE DATABASE test_db")
-    for ((tbl, _) <- rawTablesAndExpectations) {
-      hiveClient.createTable(tbl, ignoreIfExists = false)
-    }
-  }
-
-  override def afterEach(): Unit = {
-    Utils.deleteRecursively(tempDir)
-    hiveClient.dropDatabase("test_db", ignoreIfNotExists = false, cascade = true)
-  }
-
-  private def getTableMetadata(tableName: String): CatalogTable = {
-    spark.sharedState.externalCatalog.getTable("test_db", tableName)
-  }
-
-  private def defaultTableURI(tableName: String): URI = {
-    spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName, Some("test_db")))
-  }
-
-  // Raw table metadata that are dumped from tables created by Spark 2.0. Note that, all spark
-  // versions prior to 2.1 would generate almost same raw table metadata for a specific table.
-  val simpleSchema = new StructType().add("i", "int")
-  val partitionedSchema = new StructType().add("i", "int").add("j", "int")
-
-  lazy val hiveTable = CatalogTable(
-    identifier = TableIdentifier("tbl1", Some("test_db")),
-    tableType = CatalogTableType.MANAGED,
-    storage = CatalogStorageFormat.empty.copy(
-      inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
-      outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
-    schema = simpleSchema)
-
-  lazy val externalHiveTable = CatalogTable(
-    identifier = TableIdentifier("tbl2", Some("test_db")),
-    tableType = CatalogTableType.EXTERNAL,
-    storage = CatalogStorageFormat.empty.copy(
-      locationUri = Some(tempDirUri),
-      inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
-      outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
-    schema = simpleSchema)
-
-  lazy val partitionedHiveTable = CatalogTable(
-    identifier = TableIdentifier("tbl3", Some("test_db")),
-    tableType = CatalogTableType.MANAGED,
-    storage = CatalogStorageFormat.empty.copy(
-      inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
-      outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
-    schema = partitionedSchema,
-    partitionColumnNames = Seq("j"))
-
-
-  val simpleSchemaJson =
-    """
-      |{
-      | "type": "struct",
-      | "fields": [{
-      |             "name": "i",
-      |             "type": "integer",
-      |             "nullable": true,
-      |             "metadata": {}
-      |            }]
-      |}
-    """.stripMargin
-
-  val partitionedSchemaJson =
-    """
-      |{
-      | "type": "struct",
-      | "fields": [{
-      |             "name": "i",
-      |             "type": "integer",
-      |             "nullable": true,
-      |             "metadata": {}
-      |            },
-      |            {
-      |             "name": "j",
-      |             "type": "integer",
-      |             "nullable": true,
-      |             "metadata": {}
-      |            }]
-      |}
-    """.stripMargin
-
-  lazy val dataSourceTable = CatalogTable(
-    identifier = TableIdentifier("tbl4", Some("test_db")),
-    tableType = CatalogTableType.MANAGED,
-    storage = CatalogStorageFormat.empty.copy(
-      properties = Map("path" -> defaultTableURI("tbl4").toString)),
-    schema = new StructType(),
-    provider = Some("json"),
-    properties = Map(
-      "spark.sql.sources.provider" -> "json",
-      "spark.sql.sources.schema.numParts" -> "1",
-      "spark.sql.sources.schema.part.0" -> simpleSchemaJson))
-
-  lazy val hiveCompatibleDataSourceTable = CatalogTable(
-    identifier = TableIdentifier("tbl5", Some("test_db")),
-    tableType = CatalogTableType.MANAGED,
-    storage = CatalogStorageFormat.empty.copy(
-      properties = Map("path" -> defaultTableURI("tbl5").toString)),
-    schema = simpleSchema,
-    provider = Some("parquet"),
-    properties = Map(
-      "spark.sql.sources.provider" -> "parquet",
-      "spark.sql.sources.schema.numParts" -> "1",
-      "spark.sql.sources.schema.part.0" -> simpleSchemaJson))
-
-  lazy val partitionedDataSourceTable = CatalogTable(
-    identifier = TableIdentifier("tbl6", Some("test_db")),
-    tableType = CatalogTableType.MANAGED,
-    storage = CatalogStorageFormat.empty.copy(
-      properties = Map("path" -> defaultTableURI("tbl6").toString)),
-    schema = new StructType(),
-    provider = Some("json"),
-    properties = Map(
-      "spark.sql.sources.provider" -> "json",
-      "spark.sql.sources.schema.numParts" -> "1",
-      "spark.sql.sources.schema.part.0" -> partitionedSchemaJson,
-      "spark.sql.sources.schema.numPartCols" -> "1",
-      "spark.sql.sources.schema.partCol.0" -> "j"))
-
-  lazy val externalDataSourceTable = CatalogTable(
-    identifier = TableIdentifier("tbl7", Some("test_db")),
-    tableType = CatalogTableType.EXTERNAL,
-    storage = CatalogStorageFormat.empty.copy(
-      locationUri = Some(new URI(defaultTableURI("tbl7") + "-__PLACEHOLDER__")),
-      properties = Map("path" -> tempDirStr)),
-    schema = new StructType(),
-    provider = Some("json"),
-    properties = Map(
-      "spark.sql.sources.provider" -> "json",
-      "spark.sql.sources.schema.numParts" -> "1",
-      "spark.sql.sources.schema.part.0" -> simpleSchemaJson))
-
-  lazy val hiveCompatibleExternalDataSourceTable = CatalogTable(
-    identifier = TableIdentifier("tbl8", Some("test_db")),
-    tableType = CatalogTableType.EXTERNAL,
-    storage = CatalogStorageFormat.empty.copy(
-      locationUri = Some(tempDirUri),
-      properties = Map("path" -> tempDirStr)),
-    schema = simpleSchema,
-    properties = Map(
-      "spark.sql.sources.provider" -> "parquet",
-      "spark.sql.sources.schema.numParts" -> "1",
-      "spark.sql.sources.schema.part.0" -> simpleSchemaJson))
-
-  lazy val dataSourceTableWithoutSchema = CatalogTable(
-    identifier = TableIdentifier("tbl9", Some("test_db")),
-    tableType = CatalogTableType.EXTERNAL,
-    storage = CatalogStorageFormat.empty.copy(
-      locationUri = Some(new URI(defaultTableURI("tbl9") + "-__PLACEHOLDER__")),
-      properties = Map("path" -> tempDirStr)),
-    schema = new StructType(),
-    provider = Some("json"),
-    properties = Map("spark.sql.sources.provider" -> "json"))
-
-  // A list of all raw tables we want to test, with their expected schema.
-  lazy val rawTablesAndExpectations = Seq(
-    hiveTable -> simpleSchema,
-    externalHiveTable -> simpleSchema,
-    partitionedHiveTable -> partitionedSchema,
-    dataSourceTable -> simpleSchema,
-    hiveCompatibleDataSourceTable -> simpleSchema,
-    partitionedDataSourceTable -> partitionedSchema,
-    externalDataSourceTable -> simpleSchema,
-    hiveCompatibleExternalDataSourceTable -> simpleSchema,
-    dataSourceTableWithoutSchema -> new StructType())
-
-  test("make sure we can read table created by old version of Spark") {
-    for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
-      val readBack = getTableMetadata(tbl.identifier.table)
-      assert(readBack.schema.sameType(expectedSchema))
-
-      if (tbl.tableType == CatalogTableType.EXTERNAL) {
-        // trim the URI prefix
-        val tableLocation = readBack.storage.locationUri.get.getPath
-        val expectedLocation = tempDir.toURI.getPath.stripSuffix("/")
-        assert(tableLocation == expectedLocation)
-      }
-    }
-  }
-
-  test("make sure we can alter table location created by old version of Spark") {
-    withTempDir { dir =>
-      for ((tbl, _) <- rawTablesAndExpectations if tbl.tableType == CatalogTableType.EXTERNAL) {
-        val path = dir.toURI.toString.stripSuffix("/")
-        sql(s"ALTER TABLE ${tbl.identifier} SET LOCATION '$path'")
-
-        val readBack = getTableMetadata(tbl.identifier.table)
-
-        // trim the URI prefix
-        val actualTableLocation = readBack.storage.locationUri.get.getPath
-        val expected = dir.toURI.getPath.stripSuffix("/")
-        assert(actualTableLocation == expected)
-      }
-    }
-  }
-
-  test("make sure we can rename table created by old version of Spark") {
-    for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
-      val newName = tbl.identifier.table + "_renamed"
-      sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName")
-
-      val readBack = getTableMetadata(newName)
-      assert(readBack.schema.sameType(expectedSchema))
-
-      // trim the URI prefix
-      val actualTableLocation = readBack.storage.locationUri.get.getPath
-      val expectedLocation = if (tbl.tableType == CatalogTableType.EXTERNAL) {
-        tempDir.toURI.getPath.stripSuffix("/")
-      } else {
-        // trim the URI prefix
-        defaultTableURI(newName).getPath
-      }
-      assert(actualTableLocation == expectedLocation)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dbb82412/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
new file mode 100644
index 0000000..2928a73
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.nio.file.Files
+
+import org.apache.spark.TestUtils
+import org.apache.spark.sql.{QueryTest, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+/**
+ * Test HiveExternalCatalog backward compatibility.
+ *
+ * Note that, this test suite will automatically download spark binary packages of different
+ * versions to a local directory `/tmp/spark-test`. If there is already a spark folder with
+ * expected version under this local directory, e.g. `/tmp/spark-test/spark-2.0.3`, we will skip the
+ * downloading for this spark version.
+ */
+class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
+  private val wareHousePath = Utils.createTempDir(namePrefix = "warehouse")
+  private val tmpDataDir = Utils.createTempDir(namePrefix = "test-data")
+  private val sparkTestingDir = "/tmp/spark-test"
+  private val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+
+  override def afterAll(): Unit = {
+    Utils.deleteRecursively(wareHousePath)
+    Utils.deleteRecursively(tmpDataDir)
+    super.afterAll()
+  }
+
+  private def downloadSpark(version: String): Unit = {
+    import scala.sys.process._
+
+    val url = s"https://d3kbcqa49mib13.cloudfront.net/spark-$version-bin-hadoop2.7.tgz"
+
+    Seq("wget", url, "-q", "-P", sparkTestingDir).!
+
+    val downloaded = new File(sparkTestingDir, s"spark-$version-bin-hadoop2.7.tgz").getCanonicalPath
+    val targetDir = new File(sparkTestingDir, s"spark-$version").getCanonicalPath
+
+    Seq("mkdir", targetDir).!
+
+    Seq("tar", "-xzf", downloaded, "-C", targetDir, "--strip-components=1").!
+
+    Seq("rm", downloaded).!
+  }
+
+  private def genDataDir(name: String): String = {
+    new File(tmpDataDir, name).getCanonicalPath
+  }
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    val tempPyFile = File.createTempFile("test", ".py")
+    Files.write(tempPyFile.toPath,
+      s"""
+        |from pyspark.sql import SparkSession
+        |
+        |spark = SparkSession.builder.enableHiveSupport().getOrCreate()
+        |version_index = spark.conf.get("spark.sql.test.version.index", None)
+        |
+        |spark.sql("create table data_source_tbl_{} using json as select 1 i".format(version_index))
+        |
+        |spark.sql("create table hive_compatible_data_source_tbl_" + version_index + \\
+        |          " using parquet as select 1 i")
+        |
+        |json_file = "${genDataDir("json_")}" + str(version_index)
+        |spark.range(1, 2).selectExpr("cast(id as int) as i").write.json(json_file)
+        |spark.sql("create table external_data_source_tbl_" + version_index + \\
+        |          "(i int) using json options (path '{}')".format(json_file))
+        |
+        |parquet_file = "${genDataDir("parquet_")}" + str(version_index)
+        |spark.range(1, 2).selectExpr("cast(id as int) as i").write.parquet(parquet_file)
+        |spark.sql("create table hive_compatible_external_data_source_tbl_" + version_index + \\
+        |          "(i int) using parquet options (path '{}')".format(parquet_file))
+        |
+        |json_file2 = "${genDataDir("json2_")}" + str(version_index)
+        |spark.range(1, 2).selectExpr("cast(id as int) as i").write.json(json_file2)
+        |spark.sql("create table external_table_without_schema_" + version_index + \\
+        |          " using json options (path '{}')".format(json_file2))
+        |
+        |spark.sql("create view v_{} as select 1 i".format(version_index))
+      """.stripMargin.getBytes("utf8"))
+
+    PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, index) =>
+      val sparkHome = new File(sparkTestingDir, s"spark-$version")
+      if (!sparkHome.exists()) {
+        downloadSpark(version)
+      }
+
+      val args = Seq(
+        "--name", "prepare testing tables",
+        "--master", "local[2]",
+        "--conf", "spark.ui.enabled=false",
+        "--conf", "spark.master.rest.enabled=false",
+        "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}",
+        "--conf", s"spark.sql.test.version.index=$index",
+        "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
+        tempPyFile.getCanonicalPath)
+      runSparkSubmit(args, Some(sparkHome.getCanonicalPath))
+    }
+
+    tempPyFile.delete()
+  }
+
+  test("backward compatibility") {
+    val args = Seq(
+      "--class", PROCESS_TABLES.getClass.getName.stripSuffix("$"),
+      "--name", "HiveExternalCatalog backward compatibility test",
+      "--master", "local[2]",
+      "--conf", "spark.ui.enabled=false",
+      "--conf", "spark.master.rest.enabled=false",
+      "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}",
+      "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
+      unusedJar.toString)
+    runSparkSubmit(args)
+  }
+}
+
+object PROCESS_TABLES extends QueryTest with SQLTestUtils {
+  // Tests the latest version of every release line.
+  val testingVersions = Seq("2.0.2", "2.1.1", "2.2.0")
+
+  protected var spark: SparkSession = _
+
+  def main(args: Array[String]): Unit = {
+    val session = SparkSession.builder()
+      .enableHiveSupport()
+      .getOrCreate()
+    spark = session
+
+    testingVersions.indices.foreach { index =>
+      Seq(
+        s"data_source_tbl_$index",
+        s"hive_compatible_data_source_tbl_$index",
+        s"external_data_source_tbl_$index",
+        s"hive_compatible_external_data_source_tbl_$index",
+        s"external_table_without_schema_$index").foreach { tbl =>
+        val tableMeta = spark.sharedState.externalCatalog.getTable("default", tbl)
+
+        // make sure we can insert and query these tables.
+        session.sql(s"insert into $tbl select 2")
+        checkAnswer(session.sql(s"select * from $tbl"), Row(1) :: Row(2) :: Nil)
+        checkAnswer(session.sql(s"select i from $tbl where i > 1"), Row(2))
+
+        // make sure we can rename table.
+        val newName = tbl + "_renamed"
+        sql(s"ALTER TABLE $tbl RENAME TO $newName")
+        val readBack = spark.sharedState.externalCatalog.getTable("default", newName)
+
+        val actualTableLocation = readBack.storage.locationUri.get.getPath
+        val expectedLocation = if (tableMeta.tableType == CatalogTableType.EXTERNAL) {
+          tableMeta.storage.locationUri.get.getPath
+        } else {
+          spark.sessionState.catalog.defaultTablePath(TableIdentifier(newName, None)).getPath
+        }
+        assert(actualTableLocation == expectedLocation)
+
+        // make sure we can alter table location.
+        withTempDir { dir =>
+          val path = dir.toURI.toString.stripSuffix("/")
+          sql(s"ALTER TABLE ${tbl}_renamed SET LOCATION '$path'")
+          val readBack = spark.sharedState.externalCatalog.getTable("default", tbl + "_renamed")
+          val actualTableLocation = readBack.storage.locationUri.get.getPath
+          val expected = dir.toURI.getPath.stripSuffix("/")
+          assert(actualTableLocation == expected)
+        }
+      }
+
+      // test permanent view
+      checkAnswer(sql(s"select i from v_$index"), Row(1))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dbb82412/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index be6aa6d..21b3e28 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -18,17 +18,11 @@
 package org.apache.spark.sql.hive
 
 import java.io.{BufferedWriter, File, FileWriter}
-import java.sql.Timestamp
-import java.util.Date
 
-import scala.collection.mutable.ArrayBuffer
 import scala.tools.nsc.Properties
 
 import org.apache.hadoop.fs.Path
 import org.scalatest.{BeforeAndAfterEach, Matchers}
-import org.scalatest.concurrent.TimeLimits
-import org.scalatest.exceptions.TestFailedDueToTimeoutException
-import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
@@ -38,7 +32,6 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
-import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.sql.types.{DecimalType, StructType}
 import org.apache.spark.util.{ResetSystemProperties, Utils}
 
@@ -46,11 +39,10 @@ import org.apache.spark.util.{ResetSystemProperties, Utils}
  * This suite tests spark-submit with applications using HiveContext.
  */
 class HiveSparkSubmitSuite
-  extends SparkFunSuite
+  extends SparkSubmitTestUtils
   with Matchers
   with BeforeAndAfterEach
-  with ResetSystemProperties
-  with TimeLimits {
+  with ResetSystemProperties {
 
   // TODO: rewrite these or mark them as slow tests to be run sparingly
 
@@ -333,71 +325,6 @@ class HiveSparkSubmitSuite
       unusedJar.toString)
     runSparkSubmit(argsForShowTables)
   }
-
-  // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
-  // This is copied from org.apache.spark.deploy.SparkSubmitSuite
-  private def runSparkSubmit(args: Seq[String]): Unit = {
-    val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
-    val history = ArrayBuffer.empty[String]
-    val sparkSubmit = if (Utils.isWindows) {
-      // On Windows, `ProcessBuilder.directory` does not change the current working directory.
-      new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath
-    } else {
-      "./bin/spark-submit"
-    }
-    val commands = Seq(sparkSubmit) ++ args
-    val commandLine = commands.mkString("'", "' '", "'")
-
-    val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome))
-    val env = builder.environment()
-    env.put("SPARK_TESTING", "1")
-    env.put("SPARK_HOME", sparkHome)
-
-    def captureOutput(source: String)(line: String): Unit = {
-      // This test suite has some weird behaviors when executed on Jenkins:
-      //
-      // 1. Sometimes it gets extremely slow out of unknown reason on Jenkins.  Here we add a
-      //    timestamp to provide more diagnosis information.
-      // 2. Log lines are not correctly redirected to unit-tests.log as expected, so here we print
-      //    them out for debugging purposes.
-      val logLine = s"${new Timestamp(new Date().getTime)} - $source> $line"
-      // scalastyle:off println
-      println(logLine)
-      // scalastyle:on println
-      history += logLine
-    }
-
-    val process = builder.start()
-    new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start()
-    new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()
-
-    try {
-      val exitCode = failAfter(300.seconds) { process.waitFor() }
-      if (exitCode != 0) {
-        // include logs in output. Note that logging is async and may not have completed
-        // at the time this exception is raised
-        Thread.sleep(1000)
-        val historyLog = history.mkString("\n")
-        fail {
-          s"""spark-submit returned with exit code $exitCode.
-             |Command line: $commandLine
-             |
-             |$historyLog
-           """.stripMargin
-        }
-      }
-    } catch {
-      case to: TestFailedDueToTimeoutException =>
-        val historyLog = history.mkString("\n")
-        fail(s"Timeout of $commandLine" +
-            s" See the log4j logs for more detail." +
-            s"\n$historyLog", to)
-        case t: Throwable => throw t
-    } finally {
-      // Ensure we still kill the process in case it timed out
-      process.destroy()
-    }
-  }
 }
 
 object SetMetastoreURLTest extends Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/dbb82412/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 83cee5d..29b0e6c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1354,31 +1354,4 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       sparkSession.sparkContext.conf.set(DEBUG_MODE, previousValue)
     }
   }
-
-  test("SPARK-18464: support old table which doesn't store schema in table properties") {
-    withTable("old") {
-      withTempPath { path =>
-        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
-        val tableDesc = CatalogTable(
-          identifier = TableIdentifier("old", Some("default")),
-          tableType = CatalogTableType.EXTERNAL,
-          storage = CatalogStorageFormat.empty.copy(
-            properties = Map("path" -> path.getAbsolutePath)
-          ),
-          schema = new StructType(),
-          provider = Some("parquet"),
-          properties = Map(
-            HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
-        hiveClient.createTable(tableDesc, ignoreIfExists = false)
-
-        checkAnswer(spark.table("old"), Row(1, "a"))
-        checkAnswer(sql("select * from old"), Row(1, "a"))
-
-        val expectedSchema = StructType(Seq(
-          StructField("i", IntegerType, nullable = true),
-          StructField("j", StringType, nullable = true)))
-        assert(table("old").schema === expectedSchema)
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dbb82412/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
new file mode 100644
index 0000000..ede44df
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.sql.Timestamp
+import java.util.Date
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.concurrent.TimeLimits
+import org.scalatest.exceptions.TestFailedDueToTimeoutException
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
+import org.apache.spark.util.Utils
+
+trait SparkSubmitTestUtils extends SparkFunSuite with TimeLimits {
+
+  // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
+  // This is copied from org.apache.spark.deploy.SparkSubmitSuite
+  protected def runSparkSubmit(args: Seq[String], sparkHomeOpt: Option[String] = None): Unit = {
+    val sparkHome = sparkHomeOpt.getOrElse(
+      sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")))
+    val history = ArrayBuffer.empty[String]
+    val sparkSubmit = if (Utils.isWindows) {
+      // On Windows, `ProcessBuilder.directory` does not change the current working directory.
+      new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath
+    } else {
+      "./bin/spark-submit"
+    }
+    val commands = Seq(sparkSubmit) ++ args
+    val commandLine = commands.mkString("'", "' '", "'")
+
+    val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome))
+    val env = builder.environment()
+    env.put("SPARK_TESTING", "1")
+    env.put("SPARK_HOME", sparkHome)
+
+    def captureOutput(source: String)(line: String): Unit = {
+      // This test suite has some weird behaviors when executed on Jenkins:
+      //
+      // 1. Sometimes it gets extremely slow out of unknown reason on Jenkins.  Here we add a
+      //    timestamp to provide more diagnosis information.
+      // 2. Log lines are not correctly redirected to unit-tests.log as expected, so here we print
+      //    them out for debugging purposes.
+      val logLine = s"${new Timestamp(new Date().getTime)} - $source> $line"
+      // scalastyle:off println
+      println(logLine)
+      // scalastyle:on println
+      history += logLine
+    }
+
+    val process = builder.start()
+    new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start()
+    new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()
+
+    try {
+      val exitCode = failAfter(300.seconds) { process.waitFor() }
+      if (exitCode != 0) {
+        // include logs in output. Note that logging is async and may not have completed
+        // at the time this exception is raised
+        Thread.sleep(1000)
+        val historyLog = history.mkString("\n")
+        fail {
+          s"""spark-submit returned with exit code $exitCode.
+             |Command line: $commandLine
+             |
+             |$historyLog
+           """.stripMargin
+        }
+      }
+    } catch {
+      case to: TestFailedDueToTimeoutException =>
+        val historyLog = history.mkString("\n")
+        fail(s"Timeout of $commandLine" +
+          s" See the log4j logs for more detail." +
+          s"\n$historyLog", to)
+      case t: Throwable => throw t
+    } finally {
+      // Ensure we still kill the process in case it timed out
+      process.destroy()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org