You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/11 06:50:16 UTC

spark git commit: [SPARK-20331][SQL] Enhanced Hive partition pruning predicate pushdown

Repository: spark
Updated Branches:
  refs/heads/master d4d9e17b3 -> a4baa8f48


[SPARK-20331][SQL] Enhanced Hive partition pruning predicate pushdown

(Link to Jira: https://issues.apache.org/jira/browse/SPARK-20331)

## What changes were proposed in this pull request?

Spark 2.1 introduced scalable support for Hive tables with huge numbers of partitions. Key to leveraging this support is the ability to prune unnecessary table partitions to answer queries. Spark supports a subset of the class of partition pruning predicates that the Hive metastore supports. If a user writes a query with a partition pruning predicate that is *not* supported by Spark, Spark falls back to loading all partitions and pruning client-side. We want to broaden Spark's current partition pruning predicate pushdown capabilities.

One of the key missing capabilities is support for disjunctions. For example, for a table partitioned by date, writing a query with a predicate like

    date = 20161011 or date = 20161014

will result in Spark fetching all partitions. For a table partitioned by date and hour, querying a range of hours across dates can be quite difficult to accomplish without fetching all partition metadata.

The current partition pruning support supports only comparisons against literals. We can expand that to foldable expressions by evaluating them at planning time.

We can also implement support for the "IN" comparison by expanding it to a sequence of "OR"s.

## How was this patch tested?

The `HiveClientSuite` and `VersionsSuite` were refactored and simplified to make Hive client-based, version-specific testing more modular and conceptually simpler. There are now two Hive test suites: `HiveClientSuite` and `HivePartitionFilteringSuite`. These test suites have a single-argument constructor taking a `version` parameter. As such, these test suites cannot be run by themselves. Instead, they have been bundled into "aggregation" test suites which run each suite for each Hive client version. These aggregation suites are called `HiveClientSuites` and `HivePartitionFilteringSuites`. The `VersionsSuite` and `HiveClientSuite` have been refactored into each of these aggregation suites, respectively.

`HiveClientSuite` and `HivePartitionFilteringSuite` subclass a new abstract class, `HiveVersionSuite`. `HiveVersionSuite` collects functionality related to testing a single Hive version and overrides relevant test suite methods to display version-specific information.

A new trait, `HiveClientVersions`, has been added with a sequence of Hive test versions.

Author: Michael Allman <mi...@videoamp.com>

Closes #17633 from mallman/spark-20331-enhanced_partition_pruning_pushdown.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4baa8f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4baa8f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4baa8f4

Branch: refs/heads/master
Commit: a4baa8f48fe611e5c4d147f22fb2bb4c78d58a09
Parents: d4d9e17
Author: Michael Allman <mi...@videoamp.com>
Authored: Tue Jul 11 14:50:11 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jul 11 14:50:11 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/hive/client/HiveShim.scala |  71 +++++-
 .../sql/hive/client/HiveClientBuilder.scala     |   6 +-
 .../spark/sql/hive/client/HiveClientSuite.scala | 225 +++++++++++++++++--
 .../sql/hive/client/HiveClientSuites.scala      |  29 +++
 .../sql/hive/client/HiveClientVersions.scala    |  26 +++
 .../sql/hive/client/HiveVersionSuite.scala      |  46 ++++
 .../spark/sql/hive/client/VersionsSuite.scala   |   4 +-
 7 files changed, 378 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a4baa8f4/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 7abb9f0..449a303 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
+import scala.util.Try
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
@@ -46,6 +47,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTableParti
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegralType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
 /**
@@ -589,18 +591,67 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
         col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
       .map(col => col.getName).toSet
 
-    filters.collect {
-      case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
-        s"${a.name} ${op.symbol} $v"
-      case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
-        s"$v ${op.symbol} ${a.name}"
-      case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType))
+    object ExtractableLiteral {
+      def unapply(expr: Expression): Option[String] = expr match {
+        case Literal(value, _: IntegralType) => Some(value.toString)
+        case Literal(value, _: StringType) => Some(quoteStringLiteral(value.toString))
+        case _ => None
+      }
+    }
+
+    object ExtractableLiterals {
+      def unapply(exprs: Seq[Expression]): Option[Seq[String]] = {
+        exprs.map(ExtractableLiteral.unapply).foldLeft(Option(Seq.empty[String])) {
+          case (Some(accum), Some(value)) => Some(accum :+ value)
+          case _ => None
+        }
+      }
+    }
+
+    object ExtractableValues {
+      private lazy val valueToLiteralString: PartialFunction[Any, String] = {
+        case value: Byte => value.toString
+        case value: Short => value.toString
+        case value: Int => value.toString
+        case value: Long => value.toString
+        case value: UTF8String => quoteStringLiteral(value.toString)
+      }
+
+      def unapply(values: Set[Any]): Option[Seq[String]] = {
+        values.toSeq.foldLeft(Option(Seq.empty[String])) {
+          case (Some(accum), value) if valueToLiteralString.isDefinedAt(value) =>
+            Some(accum :+ valueToLiteralString(value))
+          case _ => None
+        }
+      }
+    }
+
+    def convertInToOr(a: Attribute, values: Seq[String]): String = {
+      values.map(value => s"${a.name} = $value").mkString("(", " or ", ")")
+    }
+
+    lazy val convert: PartialFunction[Expression, String] = {
+      case In(a: Attribute, ExtractableLiterals(values))
+          if !varcharKeys.contains(a.name) && values.nonEmpty =>
+        convertInToOr(a, values)
+      case InSet(a: Attribute, ExtractableValues(values))
+          if !varcharKeys.contains(a.name) && values.nonEmpty =>
+        convertInToOr(a, values)
+      case op @ BinaryComparison(a: Attribute, ExtractableLiteral(value))
           if !varcharKeys.contains(a.name) =>
-        s"""${a.name} ${op.symbol} ${quoteStringLiteral(v.toString)}"""
-      case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute)
+        s"${a.name} ${op.symbol} $value"
+      case op @ BinaryComparison(ExtractableLiteral(value), a: Attribute)
           if !varcharKeys.contains(a.name) =>
-        s"""${quoteStringLiteral(v.toString)} ${op.symbol} ${a.name}"""
-    }.mkString(" and ")
+        s"$value ${op.symbol} ${a.name}"
+      case op @ And(expr1, expr2)
+          if convert.isDefinedAt(expr1) || convert.isDefinedAt(expr2) =>
+        (convert.lift(expr1) ++ convert.lift(expr2)).mkString("(", " and ", ")")
+      case op @ Or(expr1, expr2)
+          if convert.isDefinedAt(expr1) && convert.isDefinedAt(expr2) =>
+        s"(${convert(expr1)} or ${convert(expr2)})"
+    }
+
+    filters.map(convert.lift).collect { case Some(filterString) => filterString }.mkString(" and ")
   }
 
   private def quoteStringLiteral(str: String): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/a4baa8f4/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
index e85ea5a..ae804ce 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
@@ -25,9 +25,7 @@ import org.apache.hadoop.util.VersionInfo
 import org.apache.spark.SparkConf
 import org.apache.spark.util.Utils
 
-private[client] class HiveClientBuilder {
-  private val sparkConf = new SparkConf()
-
+private[client] object HiveClientBuilder {
   // In order to speed up test execution during development or in Jenkins, you can specify the path
   // of an existing Ivy cache:
   private val ivyPath: Option[String] = {
@@ -52,7 +50,7 @@ private[client] class HiveClientBuilder {
     IsolatedClientLoader.forVersion(
       hiveMetastoreVersion = version,
       hadoopVersion = VersionInfo.getVersion,
-      sparkConf = sparkConf,
+      sparkConf = new SparkConf(),
       hadoopConf = hadoopConf,
       config = buildConf(extraConf),
       ivyPath = ivyPath).createClient()

http://git-wip-us.apache.org/repos/asf/spark/blob/a4baa8f4/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
index 4790331..6a2c23a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -19,21 +19,25 @@ package org.apache.spark.sql.hive.client
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.conf.HiveConf
+import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, InSet, LessThan, LessThanOrEqual, Like, Literal, Or}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.{ByteType, IntegerType, StringType}
 
-class HiveClientSuite extends SparkFunSuite {
-  private val clientBuilder = new HiveClientBuilder
+// TODO: Refactor this to `HivePartitionFilteringSuite`
+class HiveClientSuite(version: String)
+    extends HiveVersionSuite(version) with BeforeAndAfterAll {
+  import CatalystSqlParser._
 
   private val tryDirectSqlKey = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname
 
-  test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") {
-    val testPartitionCount = 5
+  private val testPartitionCount = 3 * 24 * 4
 
+  private def init(tryDirectSql: Boolean): HiveClient = {
     val storageFormat = CatalogStorageFormat(
       locationUri = None,
       inputFormat = None,
@@ -43,19 +47,214 @@ class HiveClientSuite extends SparkFunSuite {
       properties = Map.empty)
 
     val hadoopConf = new Configuration()
-    hadoopConf.setBoolean(tryDirectSqlKey, false)
-    val client = clientBuilder.buildClient(HiveUtils.hiveExecutionVersion, hadoopConf)
-    client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (part INT)")
+    hadoopConf.setBoolean(tryDirectSqlKey, tryDirectSql)
+    val client = buildClient(hadoopConf)
+    client
+      .runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (ds INT, h INT, chunk STRING)")
+
+    val partitions =
+      for {
+        ds <- 20170101 to 20170103
+        h <- 0 to 23
+        chunk <- Seq("aa", "ab", "ba", "bb")
+      } yield CatalogTablePartition(Map(
+        "ds" -> ds.toString,
+        "h" -> h.toString,
+        "chunk" -> chunk
+      ), storageFormat)
+    assert(partitions.size == testPartitionCount)
 
-    val partitions = (1 to testPartitionCount).map { part =>
-      CatalogTablePartition(Map("part" -> part.toString), storageFormat)
-    }
     client.createPartitions(
       "default", "test", partitions, ignoreIfExists = false)
+    client
+  }
 
+  override def beforeAll() {
+    client = init(true)
+  }
+
+  test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") {
+    val client = init(false)
     val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"),
-      Seq(EqualTo(AttributeReference("part", IntegerType)(), Literal(3))))
+      Seq(parseExpression("ds=20170101")))
 
     assert(filteredPartitions.size == testPartitionCount)
   }
+
+  test("getPartitionsByFilter: ds=20170101") {
+    testMetastorePartitionFiltering(
+      "ds=20170101",
+      20170101 to 20170101,
+      0 to 23,
+      "aa" :: "ab" :: "ba" :: "bb" :: Nil)
+  }
+
+  test("getPartitionsByFilter: ds=(20170101 + 1) and h=0") {
+    // Should return all partitions where h=0 because getPartitionsByFilter does not support
+    // comparisons to non-literal values
+    testMetastorePartitionFiltering(
+      "ds=(20170101 + 1) and h=0",
+      20170101 to 20170103,
+      0 to 0,
+      "aa" :: "ab" :: "ba" :: "bb" :: Nil)
+  }
+
+  test("getPartitionsByFilter: chunk='aa'") {
+    testMetastorePartitionFiltering(
+      "chunk='aa'",
+      20170101 to 20170103,
+      0 to 23,
+      "aa" :: Nil)
+  }
+
+  test("getPartitionsByFilter: 20170101=ds") {
+    testMetastorePartitionFiltering(
+      "20170101=ds",
+      20170101 to 20170101,
+      0 to 23,
+      "aa" :: "ab" :: "ba" :: "bb" :: Nil)
+  }
+
+  test("getPartitionsByFilter: ds=20170101 and h=10") {
+    testMetastorePartitionFiltering(
+      "ds=20170101 and h=10",
+      20170101 to 20170101,
+      10 to 10,
+      "aa" :: "ab" :: "ba" :: "bb" :: Nil)
+  }
+
+  test("getPartitionsByFilter: ds=20170101 or ds=20170102") {
+    testMetastorePartitionFiltering(
+      "ds=20170101 or ds=20170102",
+      20170101 to 20170102,
+      0 to 23,
+      "aa" :: "ab" :: "ba" :: "bb" :: Nil)
+  }
+
+  test("getPartitionsByFilter: ds in (20170102, 20170103) (using IN expression)") {
+    testMetastorePartitionFiltering(
+      "ds in (20170102, 20170103)",
+      20170102 to 20170103,
+      0 to 23,
+      "aa" :: "ab" :: "ba" :: "bb" :: Nil)
+  }
+
+  test("getPartitionsByFilter: ds in (20170102, 20170103) (using INSET expression)") {
+    testMetastorePartitionFiltering(
+      "ds in (20170102, 20170103)",
+      20170102 to 20170103,
+      0 to 23,
+      "aa" :: "ab" :: "ba" :: "bb" :: Nil, {
+        case expr @ In(v, list) if expr.inSetConvertible =>
+          InSet(v, Set() ++ list.map(_.eval(EmptyRow)))
+      })
+  }
+
+  test("getPartitionsByFilter: chunk in ('ab', 'ba') (using IN expression)") {
+    testMetastorePartitionFiltering(
+      "chunk in ('ab', 'ba')",
+      20170101 to 20170103,
+      0 to 23,
+      "ab" :: "ba" :: Nil)
+  }
+
+  test("getPartitionsByFilter: chunk in ('ab', 'ba') (using INSET expression)") {
+    testMetastorePartitionFiltering(
+      "chunk in ('ab', 'ba')",
+      20170101 to 20170103,
+      0 to 23,
+      "ab" :: "ba" :: Nil, {
+        case expr @ In(v, list) if expr.inSetConvertible =>
+          InSet(v, Set() ++ list.map(_.eval(EmptyRow)))
+      })
+  }
+
+  test("getPartitionsByFilter: (ds=20170101 and h>=8) or (ds=20170102 and h<8)") {
+    val day1 = (20170101 to 20170101, 8 to 23, Seq("aa", "ab", "ba", "bb"))
+    val day2 = (20170102 to 20170102, 0 to 7, Seq("aa", "ab", "ba", "bb"))
+    testMetastorePartitionFiltering(
+      "(ds=20170101 and h>=8) or (ds=20170102 and h<8)",
+      day1 :: day2 :: Nil)
+  }
+
+  test("getPartitionsByFilter: (ds=20170101 and h>=8) or (ds=20170102 and h<(7+1))") {
+    val day1 = (20170101 to 20170101, 8 to 23, Seq("aa", "ab", "ba", "bb"))
+    // Day 2 should include all hours because we can't build a filter for h<(7+1)
+    val day2 = (20170102 to 20170102, 0 to 23, Seq("aa", "ab", "ba", "bb"))
+    testMetastorePartitionFiltering(
+      "(ds=20170101 and h>=8) or (ds=20170102 and h<(7+1))",
+      day1 :: day2 :: Nil)
+  }
+
+  test("getPartitionsByFilter: " +
+      "chunk in ('ab', 'ba') and ((ds=20170101 and h>=8) or (ds=20170102 and h<8))") {
+    val day1 = (20170101 to 20170101, 8 to 23, Seq("ab", "ba"))
+    val day2 = (20170102 to 20170102, 0 to 7, Seq("ab", "ba"))
+    testMetastorePartitionFiltering(
+      "chunk in ('ab', 'ba') and ((ds=20170101 and h>=8) or (ds=20170102 and h<8))",
+      day1 :: day2 :: Nil)
+  }
+
+  private def testMetastorePartitionFiltering(
+      filterString: String,
+      expectedDs: Seq[Int],
+      expectedH: Seq[Int],
+      expectedChunks: Seq[String]): Unit = {
+    testMetastorePartitionFiltering(
+      filterString,
+      (expectedDs, expectedH, expectedChunks) :: Nil,
+      identity)
+  }
+
+  private def testMetastorePartitionFiltering(
+      filterString: String,
+      expectedDs: Seq[Int],
+      expectedH: Seq[Int],
+      expectedChunks: Seq[String],
+      transform: Expression => Expression): Unit = {
+    testMetastorePartitionFiltering(
+      filterString,
+      (expectedDs, expectedH, expectedChunks) :: Nil,
+      identity)
+  }
+
+  private def testMetastorePartitionFiltering(
+      filterString: String,
+      expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])]): Unit = {
+    testMetastorePartitionFiltering(filterString, expectedPartitionCubes, identity)
+  }
+
+  private def testMetastorePartitionFiltering(
+      filterString: String,
+      expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])],
+      transform: Expression => Expression): Unit = {
+    val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"),
+      Seq(
+        transform(parseExpression(filterString))
+      ))
+
+    val expectedPartitionCount = expectedPartitionCubes.map {
+      case (expectedDs, expectedH, expectedChunks) =>
+        expectedDs.size * expectedH.size * expectedChunks.size
+    }.sum
+
+    val expectedPartitions = expectedPartitionCubes.map {
+      case (expectedDs, expectedH, expectedChunks) =>
+        for {
+          ds <- expectedDs
+          h <- expectedH
+          chunk <- expectedChunks
+        } yield Set(
+          "ds" -> ds.toString,
+          "h" -> h.toString,
+          "chunk" -> chunk
+        )
+    }.reduce(_ ++ _)
+
+    val actualFilteredPartitionCount = filteredPartitions.size
+
+    assert(actualFilteredPartitionCount == expectedPartitionCount,
+      s"Expected $expectedPartitionCount partitions but got $actualFilteredPartitionCount")
+    assert(filteredPartitions.map(_.spec.toSet).toSet == expectedPartitions.toSet)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a4baa8f4/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuites.scala
new file mode 100644
index 0000000..de1be21
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuites.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.client
+
+import scala.collection.immutable.IndexedSeq
+
+import org.scalatest.Suite
+
+class HiveClientSuites extends Suite with HiveClientVersions {
+  override def nestedSuites: IndexedSeq[Suite] = {
+    // Hive 0.12 does not provide the partition filtering API we call
+    versions.filterNot(_ == "0.12").map(new HiveClientSuite(_))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a4baa8f4/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
new file mode 100644
index 0000000..2e7dfde
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.client
+
+import scala.collection.immutable.IndexedSeq
+
+import org.apache.spark.SparkFunSuite
+
+private[client] trait HiveClientVersions {
+  protected val versions = IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a4baa8f4/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
new file mode 100644
index 0000000..986c667
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.client
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.Tag
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.hive.HiveUtils
+
+private[client] abstract class HiveVersionSuite(version: String) extends SparkFunSuite {
+  protected var client: HiveClient = null
+
+  protected def buildClient(hadoopConf: Configuration): HiveClient = {
+    // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
+    // hive.metastore.schema.verification from false to true since 2.0
+    // For details, see the JIRA HIVE-6113 and HIVE-12463
+    if (version == "2.0" || version == "2.1") {
+      hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
+      hadoopConf.set("hive.metastore.schema.verification", "false")
+    }
+    HiveClientBuilder
+      .buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
+  }
+
+  override def suiteName: String = s"${super.suiteName}($version)"
+
+  override protected def test(testName: String, testTags: Tag*)(testFun: => Unit): Unit = {
+    super.test(s"$version: $testName", testTags: _*)(testFun)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a4baa8f4/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index f109843..82fbdd64 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -47,11 +47,11 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
  * sure that reflective calls are not throwing NoSuchMethod error, but the actually functionality
  * is not fully tested.
  */
+// TODO: Refactor this to `HiveClientSuite` and make it a subclass of `HiveVersionSuite`
 @ExtendedHiveTest
 class VersionsSuite extends SparkFunSuite with Logging {
 
-  private val clientBuilder = new HiveClientBuilder
-  import clientBuilder.buildClient
+  import HiveClientBuilder.buildClient
 
   /**
    * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org