You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/30 01:57:37 UTC

spark git commit: [SPARK-13076][SQL] Rename ClientInterface -> HiveClient

Repository: spark
Updated Branches:
  refs/heads/master e38b0baa3 -> 2cbc41282


[SPARK-13076][SQL] Rename ClientInterface -> HiveClient

And ClientWrapper -> HiveClientImpl.

I have some followup pull requests to introduce a new internal catalog, and I think this new naming reflects better the functionality of the two classes.

Author: Reynold Xin <rx...@databricks.com>

Closes #10981 from rxin/SPARK-13076.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2cbc4128
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2cbc4128
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2cbc4128

Branch: refs/heads/master
Commit: 2cbc412821641cf9446c0621ffa1976bd7fc4fa1
Parents: e38b0ba
Author: Reynold Xin <rx...@databricks.com>
Authored: Fri Jan 29 16:57:34 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Jan 29 16:57:34 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/execution/commands.scala   |   2 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |   2 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |  10 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   2 +-
 .../spark/sql/hive/client/ClientInterface.scala | 195 -------
 .../spark/sql/hive/client/ClientWrapper.scala   | 544 -------------------
 .../spark/sql/hive/client/HiveClient.scala      | 195 +++++++
 .../spark/sql/hive/client/HiveClientImpl.scala  | 544 +++++++++++++++++++
 .../apache/spark/sql/hive/client/HiveShim.scala |   5 +-
 .../sql/hive/client/IsolatedClientLoader.scala  |  18 +-
 .../org/apache/spark/sql/hive/hiveUDFs.scala    |   4 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |   4 +-
 .../spark/sql/hive/client/VersionsSuite.scala   |   4 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   2 +-
 14 files changed, 765 insertions(+), 766 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 703e464..c6adb58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -404,7 +404,7 @@ case class DescribeFunction(
           result
         }
 
-      case None => Seq(Row(s"Function: $functionName is not found."))
+      case None => Seq(Row(s"Function: $functionName not found."))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 51a50c1..2b821c1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -84,7 +84,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
       "Extended Usage")
 
     checkExistence(sql("describe functioN abcadf"), true,
-      "Function: abcadf is not found.")
+      "Function: abcadf not found.")
   }
 
   test("SPARK-6743: no columns from cache") {

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 1797ea5..05863ae 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -79,8 +79,8 @@ class HiveContext private[hive](
     sc: SparkContext,
     cacheManager: CacheManager,
     listener: SQLListener,
-    @transient private val execHive: ClientWrapper,
-    @transient private val metaHive: ClientInterface,
+    @transient private val execHive: HiveClientImpl,
+    @transient private val metaHive: HiveClient,
     isRootContext: Boolean)
   extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging {
   self =>
@@ -193,7 +193,7 @@ class HiveContext private[hive](
    * for storing persistent metadata, and only point to a dummy metastore in a temporary directory.
    */
   @transient
-  protected[hive] lazy val executionHive: ClientWrapper = if (execHive != null) {
+  protected[hive] lazy val executionHive: HiveClientImpl = if (execHive != null) {
     execHive
   } else {
     logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
@@ -203,7 +203,7 @@ class HiveContext private[hive](
       config = newTemporaryConfiguration(useInMemoryDerby = true),
       isolationOn = false,
       baseClassLoader = Utils.getContextOrSparkClassLoader)
-    loader.createClient().asInstanceOf[ClientWrapper]
+    loader.createClient().asInstanceOf[HiveClientImpl]
   }
 
   /**
@@ -222,7 +222,7 @@ class HiveContext private[hive](
    * in the hive-site.xml file.
    */
   @transient
-  protected[hive] lazy val metadataHive: ClientInterface = if (metaHive != null) {
+  protected[hive] lazy val metadataHive: HiveClient = if (metaHive != null) {
     metaHive
   } else {
     val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 848aa4e..61d0d67 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -96,7 +96,7 @@ private[hive] object HiveSerDe {
   }
 }
 
-private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
+private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext)
   extends Catalog with Logging {
 
   val conf = hive.conf

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
deleted file mode 100644
index 4eec3fe..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.client
-
-import java.io.PrintStream
-import java.util.{Map => JMap}
-import javax.annotation.Nullable
-
-import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-private[hive] case class HiveDatabase(name: String, location: String)
-
-private[hive] abstract class TableType { val name: String }
-private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
-private[hive] case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
-private[hive] case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
-private[hive] case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }
-
-// TODO: Use this for Tables and Partitions
-private[hive] case class HiveStorageDescriptor(
-    location: String,
-    inputFormat: String,
-    outputFormat: String,
-    serde: String,
-    serdeProperties: Map[String, String])
-
-private[hive] case class HivePartition(
-    values: Seq[String],
-    storage: HiveStorageDescriptor)
-
-private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String)
-private[hive] case class HiveTable(
-    specifiedDatabase: Option[String],
-    name: String,
-    schema: Seq[HiveColumn],
-    partitionColumns: Seq[HiveColumn],
-    properties: Map[String, String],
-    serdeProperties: Map[String, String],
-    tableType: TableType,
-    location: Option[String] = None,
-    inputFormat: Option[String] = None,
-    outputFormat: Option[String] = None,
-    serde: Option[String] = None,
-    viewText: Option[String] = None) {
-
-  @transient
-  private[client] var client: ClientInterface = _
-
-  private[client] def withClient(ci: ClientInterface): this.type = {
-    client = ci
-    this
-  }
-
-  def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))
-
-  def isPartitioned: Boolean = partitionColumns.nonEmpty
-
-  def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
-
-  def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
-    client.getPartitionsByFilter(this, predicates)
-
-  // Hive does not support backticks when passing names to the client.
-  def qualifiedName: String = s"$database.$name"
-}
-
-/**
- * An externally visible interface to the Hive client.  This interface is shared across both the
- * internal and external classloaders for a given version of Hive and thus must expose only
- * shared classes.
- */
-private[hive] trait ClientInterface {
-
-  /** Returns the Hive Version of this client. */
-  def version: HiveVersion
-
-  /** Returns the configuration for the given key in the current session. */
-  def getConf(key: String, defaultValue: String): String
-
-  /**
-   * Runs a HiveQL command using Hive, returning the results as a list of strings.  Each row will
-   * result in one string.
-   */
-  def runSqlHive(sql: String): Seq[String]
-
-  def setOut(stream: PrintStream): Unit
-  def setInfo(stream: PrintStream): Unit
-  def setError(stream: PrintStream): Unit
-
-  /** Returns the names of all tables in the given database. */
-  def listTables(dbName: String): Seq[String]
-
-  /** Returns the name of the active database. */
-  def currentDatabase: String
-
-  /** Sets the name of current database. */
-  def setCurrentDatabase(databaseName: String): Unit
-
-  /** Returns the metadata for specified database, throwing an exception if it doesn't exist */
-  def getDatabase(name: String): HiveDatabase = {
-    getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
-  }
-
-  /** Returns the metadata for a given database, or None if it doesn't exist. */
-  def getDatabaseOption(name: String): Option[HiveDatabase]
-
-  /** Returns the specified table, or throws [[NoSuchTableException]]. */
-  def getTable(dbName: String, tableName: String): HiveTable = {
-    getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
-  }
-
-  /** Returns the metadata for the specified table or None if it doens't exist. */
-  def getTableOption(dbName: String, tableName: String): Option[HiveTable]
-
-  /** Creates a view with the given metadata. */
-  def createView(view: HiveTable): Unit
-
-  /** Updates the given view with new metadata. */
-  def alertView(view: HiveTable): Unit
-
-  /** Creates a table with the given metadata. */
-  def createTable(table: HiveTable): Unit
-
-  /** Updates the given table with new metadata. */
-  def alterTable(table: HiveTable): Unit
-
-  /** Creates a new database with the given name. */
-  def createDatabase(database: HiveDatabase): Unit
-
-  /** Returns the specified paritition or None if it does not exist. */
-  def getPartitionOption(
-      hTable: HiveTable,
-      partitionSpec: JMap[String, String]): Option[HivePartition]
-
-  /** Returns all partitions for the given table. */
-  def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
-
-  /** Returns partitions filtered by predicates for the given table. */
-  def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition]
-
-  /** Loads a static partition into an existing table. */
-  def loadPartition(
-      loadPath: String,
-      tableName: String,
-      partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
-      replace: Boolean,
-      holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit
-
-  /** Loads data into an existing table. */
-  def loadTable(
-      loadPath: String, // TODO URI
-      tableName: String,
-      replace: Boolean,
-      holdDDLTime: Boolean): Unit
-
-  /** Loads new dynamic partitions into an existing table. */
-  def loadDynamicPartitions(
-      loadPath: String,
-      tableName: String,
-      partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
-      replace: Boolean,
-      numDP: Int,
-      holdDDLTime: Boolean,
-      listBucketingEnabled: Boolean): Unit
-
-  /** Add a jar into class loader */
-  def addJar(path: String): Unit
-
-  /** Return a ClientInterface as new session, that will share the class loader and Hive client */
-  def newSession(): ClientInterface
-
-  /** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */
-  def withHiveState[A](f: => A): A
-
-  /** Used for testing only.  Removes all metadata from this instance of Hive. */
-  def reset(): Unit
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
deleted file mode 100644
index 5307e92..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.client
-
-import java.io.{File, PrintStream}
-import java.util.{Map => JMap}
-
-import scala.collection.JavaConverters._
-import scala.language.reflectiveCalls
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.{TableType => HTableType}
-import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
-import org.apache.hadoop.hive.ql.{metadata, Driver}
-import org.apache.hadoop.hive.ql.metadata.Hive
-import org.apache.hadoop.hive.ql.processors._
-import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
-import org.apache.hadoop.security.UserGroupInformation
-
-import org.apache.spark.{Logging, SparkConf, SparkException}
-import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.util.{CircularBuffer, Utils}
-
-/**
- * A class that wraps the HiveClient and converts its responses to externally visible classes.
- * Note that this class is typically loaded with an internal classloader for each instantiation,
- * allowing it to interact directly with a specific isolated version of Hive.  Loading this class
- * with the isolated classloader however will result in it only being visible as a ClientInterface,
- * not a ClientWrapper.
- *
- * This class needs to interact with multiple versions of Hive, but will always be compiled with
- * the 'native', execution version of Hive.  Therefore, any places where hive breaks compatibility
- * must use reflection after matching on `version`.
- *
- * @param version the version of hive used when pick function calls that are not compatible.
- * @param config  a collection of configuration options that will be added to the hive conf before
- *                opening the hive client.
- * @param initClassLoader the classloader used when creating the `state` field of
- *                        this ClientWrapper.
- */
-private[hive] class ClientWrapper(
-    override val version: HiveVersion,
-    config: Map[String, String],
-    initClassLoader: ClassLoader,
-    val clientLoader: IsolatedClientLoader)
-  extends ClientInterface
-  with Logging {
-
-  // Circular buffer to hold what hive prints to STDOUT and ERR.  Only printed when failures occur.
-  private val outputBuffer = new CircularBuffer()
-
-  private val shim = version match {
-    case hive.v12 => new Shim_v0_12()
-    case hive.v13 => new Shim_v0_13()
-    case hive.v14 => new Shim_v0_14()
-    case hive.v1_0 => new Shim_v1_0()
-    case hive.v1_1 => new Shim_v1_1()
-    case hive.v1_2 => new Shim_v1_2()
-  }
-
-  // Create an internal session state for this ClientWrapper.
-  val state = {
-    val original = Thread.currentThread().getContextClassLoader
-    // Switch to the initClassLoader.
-    Thread.currentThread().setContextClassLoader(initClassLoader)
-
-    // Set up kerberos credentials for UserGroupInformation.loginUser within
-    // current class loader
-    // Instead of using the spark conf of the current spark context, a new
-    // instance of SparkConf is needed for the original value of spark.yarn.keytab
-    // and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the
-    // keytab configuration for the link name in distributed cache
-    val sparkConf = new SparkConf
-    if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
-      val principalName = sparkConf.get("spark.yarn.principal")
-      val keytabFileName = sparkConf.get("spark.yarn.keytab")
-      if (!new File(keytabFileName).exists()) {
-        throw new SparkException(s"Keytab file: ${keytabFileName}" +
-          " specified in spark.yarn.keytab does not exist")
-      } else {
-        logInfo("Attempting to login to Kerberos" +
-          s" using principal: ${principalName} and keytab: ${keytabFileName}")
-        UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName)
-      }
-    }
-
-    val ret = try {
-      val initialConf = new HiveConf(classOf[SessionState])
-      // HiveConf is a Hadoop Configuration, which has a field of classLoader and
-      // the initial value will be the current thread's context class loader
-      // (i.e. initClassLoader at here).
-      // We call initialConf.setClassLoader(initClassLoader) at here to make
-      // this action explicit.
-      initialConf.setClassLoader(initClassLoader)
-      config.foreach { case (k, v) =>
-        if (k.toLowerCase.contains("password")) {
-          logDebug(s"Hive Config: $k=xxx")
-        } else {
-          logDebug(s"Hive Config: $k=$v")
-        }
-        initialConf.set(k, v)
-      }
-      val state = new SessionState(initialConf)
-      if (clientLoader.cachedHive != null) {
-        Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
-      }
-      SessionState.start(state)
-      state.out = new PrintStream(outputBuffer, true, "UTF-8")
-      state.err = new PrintStream(outputBuffer, true, "UTF-8")
-      state
-    } finally {
-      Thread.currentThread().setContextClassLoader(original)
-    }
-    ret
-  }
-
-  /** Returns the configuration for the current session. */
-  def conf: HiveConf = SessionState.get().getConf
-
-  override def getConf(key: String, defaultValue: String): String = {
-    conf.get(key, defaultValue)
-  }
-
-  // We use hive's conf for compatibility.
-  private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES)
-  private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf)
-
-  /**
-   * Runs `f` with multiple retries in case the hive metastore is temporarily unreachable.
-   */
-  private def retryLocked[A](f: => A): A = clientLoader.synchronized {
-    // Hive sometimes retries internally, so set a deadline to avoid compounding delays.
-    val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong
-    var numTries = 0
-    var caughtException: Exception = null
-    do {
-      numTries += 1
-      try {
-        return f
-      } catch {
-        case e: Exception if causedByThrift(e) =>
-          caughtException = e
-          logWarning(
-            "HiveClientWrapper got thrift exception, destroying client and retrying " +
-              s"(${retryLimit - numTries} tries remaining)", e)
-          clientLoader.cachedHive = null
-          Thread.sleep(retryDelayMillis)
-      }
-    } while (numTries <= retryLimit && System.nanoTime < deadline)
-    if (System.nanoTime > deadline) {
-      logWarning("Deadline exceeded")
-    }
-    throw caughtException
-  }
-
-  private def causedByThrift(e: Throwable): Boolean = {
-    var target = e
-    while (target != null) {
-      val msg = target.getMessage()
-      if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
-        return true
-      }
-      target = target.getCause()
-    }
-    false
-  }
-
-  def client: Hive = {
-    if (clientLoader.cachedHive != null) {
-      clientLoader.cachedHive.asInstanceOf[Hive]
-    } else {
-      val c = Hive.get(conf)
-      clientLoader.cachedHive = c
-      c
-    }
-  }
-
-  /**
-   * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
-   */
-  def withHiveState[A](f: => A): A = retryLocked {
-    val original = Thread.currentThread().getContextClassLoader
-    // Set the thread local metastore client to the client associated with this ClientWrapper.
-    Hive.set(client)
-    // The classloader in clientLoader could be changed after addJar, always use the latest
-    // classloader
-    state.getConf.setClassLoader(clientLoader.classLoader)
-    // setCurrentSessionState will use the classLoader associated
-    // with the HiveConf in `state` to override the context class loader of the current
-    // thread.
-    shim.setCurrentSessionState(state)
-    val ret = try f finally {
-      Thread.currentThread().setContextClassLoader(original)
-    }
-    ret
-  }
-
-  def setOut(stream: PrintStream): Unit = withHiveState {
-    state.out = stream
-  }
-
-  def setInfo(stream: PrintStream): Unit = withHiveState {
-    state.info = stream
-  }
-
-  def setError(stream: PrintStream): Unit = withHiveState {
-    state.err = stream
-  }
-
-  override def currentDatabase: String = withHiveState {
-    state.getCurrentDatabase
-  }
-
-  override def setCurrentDatabase(databaseName: String): Unit = withHiveState {
-    if (getDatabaseOption(databaseName).isDefined) {
-      state.setCurrentDatabase(databaseName)
-    } else {
-      throw new NoSuchDatabaseException
-    }
-  }
-
-  override def createDatabase(database: HiveDatabase): Unit = withHiveState {
-    client.createDatabase(
-      new Database(
-        database.name,
-        "",
-        new File(database.location).toURI.toString,
-        new java.util.HashMap),
-        true)
-  }
-
-  override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState {
-    Option(client.getDatabase(name)).map { d =>
-      HiveDatabase(
-        name = d.getName,
-        location = d.getLocationUri)
-    }
-  }
-
-  override def getTableOption(
-      dbName: String,
-      tableName: String): Option[HiveTable] = withHiveState {
-
-    logDebug(s"Looking up $dbName.$tableName")
-
-    val hiveTable = Option(client.getTable(dbName, tableName, false))
-    val converted = hiveTable.map { h =>
-
-      HiveTable(
-        name = h.getTableName,
-        specifiedDatabase = Option(h.getDbName),
-        schema = h.getCols.asScala.map(f => HiveColumn(f.getName, f.getType, f.getComment)),
-        partitionColumns = h.getPartCols.asScala.map(f =>
-          HiveColumn(f.getName, f.getType, f.getComment)),
-        properties = h.getParameters.asScala.toMap,
-        serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap,
-        tableType = h.getTableType match {
-          case HTableType.MANAGED_TABLE => ManagedTable
-          case HTableType.EXTERNAL_TABLE => ExternalTable
-          case HTableType.VIRTUAL_VIEW => VirtualView
-          case HTableType.INDEX_TABLE => IndexTable
-        },
-        location = shim.getDataLocation(h),
-        inputFormat = Option(h.getInputFormatClass).map(_.getName),
-        outputFormat = Option(h.getOutputFormatClass).map(_.getName),
-        serde = Option(h.getSerializationLib),
-        viewText = Option(h.getViewExpandedText)).withClient(this)
-    }
-    converted
-  }
-
-  private def toInputFormat(name: String) =
-    Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
-
-  private def toOutputFormat(name: String) =
-    Utils.classForName(name)
-      .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
-
-  private def toQlTable(table: HiveTable): metadata.Table = {
-    val qlTable = new metadata.Table(table.database, table.name)
-
-    qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-    qlTable.setPartCols(
-      table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-    table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
-    table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }
-
-    // set owner
-    qlTable.setOwner(conf.getUser)
-    // set create time
-    qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
-
-    table.location.foreach { loc => shim.setDataLocation(qlTable, loc) }
-    table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass)
-    table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass)
-    table.serde.foreach(qlTable.setSerializationLib)
-
-    qlTable
-  }
-
-  private def toViewTable(view: HiveTable): metadata.Table = {
-    // TODO: this is duplicated with `toQlTable` except the table type stuff.
-    val tbl = new metadata.Table(view.database, view.name)
-    tbl.setTableType(HTableType.VIRTUAL_VIEW)
-    tbl.setSerializationLib(null)
-    tbl.clearSerDeInfo()
-
-    // TODO: we will save the same SQL string to original and expanded text, which is different
-    // from Hive.
-    tbl.setViewOriginalText(view.viewText.get)
-    tbl.setViewExpandedText(view.viewText.get)
-
-    tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-    view.properties.foreach { case (k, v) => tbl.setProperty(k, v) }
-
-    // set owner
-    tbl.setOwner(conf.getUser)
-    // set create time
-    tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
-
-    tbl
-  }
-
-  override def createView(view: HiveTable): Unit = withHiveState {
-    client.createTable(toViewTable(view))
-  }
-
-  override def alertView(view: HiveTable): Unit = withHiveState {
-    client.alterTable(view.qualifiedName, toViewTable(view))
-  }
-
-  override def createTable(table: HiveTable): Unit = withHiveState {
-    val qlTable = toQlTable(table)
-    client.createTable(qlTable)
-  }
-
-  override def alterTable(table: HiveTable): Unit = withHiveState {
-    val qlTable = toQlTable(table)
-    client.alterTable(table.qualifiedName, qlTable)
-  }
-
-  private def toHivePartition(partition: metadata.Partition): HivePartition = {
-    val apiPartition = partition.getTPartition
-    HivePartition(
-      values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty),
-      storage = HiveStorageDescriptor(
-        location = apiPartition.getSd.getLocation,
-        inputFormat = apiPartition.getSd.getInputFormat,
-        outputFormat = apiPartition.getSd.getOutputFormat,
-        serde = apiPartition.getSd.getSerdeInfo.getSerializationLib,
-        serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap))
-  }
-
-  override def getPartitionOption(
-      table: HiveTable,
-      partitionSpec: JMap[String, String]): Option[HivePartition] = withHiveState {
-
-    val qlTable = toQlTable(table)
-    val qlPartition = client.getPartition(qlTable, partitionSpec, false)
-    Option(qlPartition).map(toHivePartition)
-  }
-
-  override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState {
-    val qlTable = toQlTable(hTable)
-    shim.getAllPartitions(client, qlTable).map(toHivePartition)
-  }
-
-  override def getPartitionsByFilter(
-      hTable: HiveTable,
-      predicates: Seq[Expression]): Seq[HivePartition] = withHiveState {
-    val qlTable = toQlTable(hTable)
-    shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition)
-  }
-
-  override def listTables(dbName: String): Seq[String] = withHiveState {
-    client.getAllTables(dbName).asScala
-  }
-
-  /**
-   * Runs the specified SQL query using Hive.
-   */
-  override def runSqlHive(sql: String): Seq[String] = {
-    val maxResults = 100000
-    val results = runHive(sql, maxResults)
-    // It is very confusing when you only get back some of the results...
-    if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED")
-    results
-  }
-
-  /**
-   * Execute the command using Hive and return the results as a sequence. Each element
-   * in the sequence is one row.
-   */
-  protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState {
-    logDebug(s"Running hiveql '$cmd'")
-    if (cmd.toLowerCase.startsWith("set")) { logDebug(s"Changing config: $cmd") }
-    try {
-      val cmd_trimmed: String = cmd.trim()
-      val tokens: Array[String] = cmd_trimmed.split("\\s+")
-      // The remainder of the command.
-      val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
-      val proc = shim.getCommandProcessor(tokens(0), conf)
-      proc match {
-        case driver: Driver =>
-          val response: CommandProcessorResponse = driver.run(cmd)
-          // Throw an exception if there is an error in query processing.
-          if (response.getResponseCode != 0) {
-            driver.close()
-            throw new QueryExecutionException(response.getErrorMessage)
-          }
-          driver.setMaxRows(maxRows)
-
-          val results = shim.getDriverResults(driver)
-          driver.close()
-          results
-
-        case _ =>
-          if (state.out != null) {
-            // scalastyle:off println
-            state.out.println(tokens(0) + " " + cmd_1)
-            // scalastyle:on println
-          }
-          Seq(proc.run(cmd_1).getResponseCode.toString)
-      }
-    } catch {
-      case e: Exception =>
-        logError(
-          s"""
-            |======================
-            |HIVE FAILURE OUTPUT
-            |======================
-            |${outputBuffer.toString}
-            |======================
-            |END HIVE FAILURE OUTPUT
-            |======================
-          """.stripMargin)
-        throw e
-    }
-  }
-
-  def loadPartition(
-      loadPath: String,
-      tableName: String,
-      partSpec: java.util.LinkedHashMap[String, String],
-      replace: Boolean,
-      holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit = withHiveState {
-    shim.loadPartition(
-      client,
-      new Path(loadPath), // TODO: Use URI
-      tableName,
-      partSpec,
-      replace,
-      holdDDLTime,
-      inheritTableSpecs,
-      isSkewedStoreAsSubdir)
-  }
-
-  def loadTable(
-      loadPath: String, // TODO URI
-      tableName: String,
-      replace: Boolean,
-      holdDDLTime: Boolean): Unit = withHiveState {
-    shim.loadTable(
-      client,
-      new Path(loadPath),
-      tableName,
-      replace,
-      holdDDLTime)
-  }
-
-  def loadDynamicPartitions(
-      loadPath: String,
-      tableName: String,
-      partSpec: java.util.LinkedHashMap[String, String],
-      replace: Boolean,
-      numDP: Int,
-      holdDDLTime: Boolean,
-      listBucketingEnabled: Boolean): Unit = withHiveState {
-    shim.loadDynamicPartitions(
-      client,
-      new Path(loadPath),
-      tableName,
-      partSpec,
-      replace,
-      numDP,
-      holdDDLTime,
-      listBucketingEnabled)
-  }
-
-  def addJar(path: String): Unit = {
-    val uri = new Path(path).toUri
-    val jarURL = if (uri.getScheme == null) {
-      // `path` is a local file path without a URL scheme
-      new File(path).toURI.toURL
-    } else {
-      // `path` is a URL with a scheme
-      uri.toURL
-    }
-    clientLoader.addJar(jarURL)
-    runSqlHive(s"ADD JAR $path")
-  }
-
-  def newSession(): ClientWrapper = {
-    clientLoader.createClient().asInstanceOf[ClientWrapper]
-  }
-
-  def reset(): Unit = withHiveState {
-    client.getAllTables("default").asScala.foreach { t =>
-        logDebug(s"Deleting table $t")
-        val table = client.getTable("default", t)
-        client.getIndexes("default", t, 255).asScala.foreach { index =>
-          shim.dropIndex(client, "default", t, index.getIndexName)
-        }
-        if (!table.isIndexTable) {
-          client.dropTable("default", t)
-        }
-      }
-      client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db =>
-        logDebug(s"Dropping Database: $db")
-        client.dropDatabase(db, true, false, true)
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
new file mode 100644
index 0000000..f681cc6
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.client
+
+import java.io.PrintStream
+import java.util.{Map => JMap}
+import javax.annotation.Nullable
+
+import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+private[hive] case class HiveDatabase(name: String, location: String)
+
+private[hive] abstract class TableType { val name: String }
+private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
+private[hive] case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
+private[hive] case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
+private[hive] case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }
+
+// TODO: Use this for Tables and Partitions
+private[hive] case class HiveStorageDescriptor(
+    location: String,
+    inputFormat: String,
+    outputFormat: String,
+    serde: String,
+    serdeProperties: Map[String, String])
+
+private[hive] case class HivePartition(
+    values: Seq[String],
+    storage: HiveStorageDescriptor)
+
+private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String)
+private[hive] case class HiveTable(
+    specifiedDatabase: Option[String],
+    name: String,
+    schema: Seq[HiveColumn],
+    partitionColumns: Seq[HiveColumn],
+    properties: Map[String, String],
+    serdeProperties: Map[String, String],
+    tableType: TableType,
+    location: Option[String] = None,
+    inputFormat: Option[String] = None,
+    outputFormat: Option[String] = None,
+    serde: Option[String] = None,
+    viewText: Option[String] = None) {
+
+  @transient
+  private[client] var client: HiveClient = _
+
+  private[client] def withClient(ci: HiveClient): this.type = {
+    client = ci
+    this
+  }
+
+  def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))
+
+  def isPartitioned: Boolean = partitionColumns.nonEmpty
+
+  def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
+
+  def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
+    client.getPartitionsByFilter(this, predicates)
+
+  // Hive does not support backticks when passing names to the client.
+  def qualifiedName: String = s"$database.$name"
+}
+
+/**
+ * An externally visible interface to the Hive client.  This interface is shared across both the
+ * internal and external classloaders for a given version of Hive and thus must expose only
+ * shared classes.
+ */
+private[hive] trait HiveClient {
+
+  /** Returns the Hive Version of this client. */
+  def version: HiveVersion
+
+  /** Returns the configuration for the given key in the current session. */
+  def getConf(key: String, defaultValue: String): String
+
+  /**
+   * Runs a HiveQL command using Hive, returning the results as a list of strings.  Each row will
+   * result in one string.
+   */
+  def runSqlHive(sql: String): Seq[String]
+
+  def setOut(stream: PrintStream): Unit
+  def setInfo(stream: PrintStream): Unit
+  def setError(stream: PrintStream): Unit
+
+  /** Returns the names of all tables in the given database. */
+  def listTables(dbName: String): Seq[String]
+
+  /** Returns the name of the active database. */
+  def currentDatabase: String
+
+  /** Sets the name of current database. */
+  def setCurrentDatabase(databaseName: String): Unit
+
+  /** Returns the metadata for specified database, throwing an exception if it doesn't exist */
+  def getDatabase(name: String): HiveDatabase = {
+    getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
+  }
+
+  /** Returns the metadata for a given database, or None if it doesn't exist. */
+  def getDatabaseOption(name: String): Option[HiveDatabase]
+
+  /** Returns the specified table, or throws [[NoSuchTableException]]. */
+  def getTable(dbName: String, tableName: String): HiveTable = {
+    getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
+  }
+
+  /** Returns the metadata for the specified table or None if it doens't exist. */
+  def getTableOption(dbName: String, tableName: String): Option[HiveTable]
+
+  /** Creates a view with the given metadata. */
+  def createView(view: HiveTable): Unit
+
+  /** Updates the given view with new metadata. */
+  def alertView(view: HiveTable): Unit
+
+  /** Creates a table with the given metadata. */
+  def createTable(table: HiveTable): Unit
+
+  /** Updates the given table with new metadata. */
+  def alterTable(table: HiveTable): Unit
+
+  /** Creates a new database with the given name. */
+  def createDatabase(database: HiveDatabase): Unit
+
+  /** Returns the specified paritition or None if it does not exist. */
+  def getPartitionOption(
+      hTable: HiveTable,
+      partitionSpec: JMap[String, String]): Option[HivePartition]
+
+  /** Returns all partitions for the given table. */
+  def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
+
+  /** Returns partitions filtered by predicates for the given table. */
+  def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition]
+
+  /** Loads a static partition into an existing table. */
+  def loadPartition(
+      loadPath: String,
+      tableName: String,
+      partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
+      replace: Boolean,
+      holdDDLTime: Boolean,
+      inheritTableSpecs: Boolean,
+      isSkewedStoreAsSubdir: Boolean): Unit
+
+  /** Loads data into an existing table. */
+  def loadTable(
+      loadPath: String, // TODO URI
+      tableName: String,
+      replace: Boolean,
+      holdDDLTime: Boolean): Unit
+
+  /** Loads new dynamic partitions into an existing table. */
+  def loadDynamicPartitions(
+      loadPath: String,
+      tableName: String,
+      partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
+      replace: Boolean,
+      numDP: Int,
+      holdDDLTime: Boolean,
+      listBucketingEnabled: Boolean): Unit
+
+  /** Add a jar into class loader */
+  def addJar(path: String): Unit
+
+  /** Return a [[HiveClient]] as new session, that will share the class loader and Hive client */
+  def newSession(): HiveClient
+
+  /** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */
+  def withHiveState[A](f: => A): A
+
+  /** Used for testing only.  Removes all metadata from this instance of Hive. */
+  def reset(): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
new file mode 100644
index 0000000..cf1ff55
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.client
+
+import java.io.{File, PrintStream}
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.language.reflectiveCalls
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.{TableType => HTableType}
+import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
+import org.apache.hadoop.hive.ql.{metadata, Driver}
+import org.apache.hadoop.hive.ql.metadata.Hive
+import org.apache.hadoop.hive.ql.processors._
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.util.{CircularBuffer, Utils}
+
+/**
+ * A class that wraps the HiveClient and converts its responses to externally visible classes.
+ * Note that this class is typically loaded with an internal classloader for each instantiation,
+ * allowing it to interact directly with a specific isolated version of Hive.  Loading this class
+ * with the isolated classloader however will result in it only being visible as a [[HiveClient]],
+ * not a [[HiveClientImpl]].
+ *
+ * This class needs to interact with multiple versions of Hive, but will always be compiled with
+ * the 'native', execution version of Hive.  Therefore, any places where hive breaks compatibility
+ * must use reflection after matching on `version`.
+ *
+ * @param version the version of hive used when pick function calls that are not compatible.
+ * @param config  a collection of configuration options that will be added to the hive conf before
+ *                opening the hive client.
+ * @param initClassLoader the classloader used when creating the `state` field of
+ *                        this [[HiveClientImpl]].
+ */
+private[hive] class HiveClientImpl(
+    override val version: HiveVersion,
+    config: Map[String, String],
+    initClassLoader: ClassLoader,
+    val clientLoader: IsolatedClientLoader)
+  extends HiveClient
+  with Logging {
+
+  // Circular buffer to hold what hive prints to STDOUT and ERR.  Only printed when failures occur.
+  private val outputBuffer = new CircularBuffer()
+
+  private val shim = version match {
+    case hive.v12 => new Shim_v0_12()
+    case hive.v13 => new Shim_v0_13()
+    case hive.v14 => new Shim_v0_14()
+    case hive.v1_0 => new Shim_v1_0()
+    case hive.v1_1 => new Shim_v1_1()
+    case hive.v1_2 => new Shim_v1_2()
+  }
+
+  // Create an internal session state for this HiveClientImpl.
+  val state = {
+    val original = Thread.currentThread().getContextClassLoader
+    // Switch to the initClassLoader.
+    Thread.currentThread().setContextClassLoader(initClassLoader)
+
+    // Set up kerberos credentials for UserGroupInformation.loginUser within
+    // current class loader
+    // Instead of using the spark conf of the current spark context, a new
+    // instance of SparkConf is needed for the original value of spark.yarn.keytab
+    // and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the
+    // keytab configuration for the link name in distributed cache
+    val sparkConf = new SparkConf
+    if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
+      val principalName = sparkConf.get("spark.yarn.principal")
+      val keytabFileName = sparkConf.get("spark.yarn.keytab")
+      if (!new File(keytabFileName).exists()) {
+        throw new SparkException(s"Keytab file: ${keytabFileName}" +
+          " specified in spark.yarn.keytab does not exist")
+      } else {
+        logInfo("Attempting to login to Kerberos" +
+          s" using principal: ${principalName} and keytab: ${keytabFileName}")
+        UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName)
+      }
+    }
+
+    val ret = try {
+      val initialConf = new HiveConf(classOf[SessionState])
+      // HiveConf is a Hadoop Configuration, which has a field of classLoader and
+      // the initial value will be the current thread's context class loader
+      // (i.e. initClassLoader at here).
+      // We call initialConf.setClassLoader(initClassLoader) at here to make
+      // this action explicit.
+      initialConf.setClassLoader(initClassLoader)
+      config.foreach { case (k, v) =>
+        if (k.toLowerCase.contains("password")) {
+          logDebug(s"Hive Config: $k=xxx")
+        } else {
+          logDebug(s"Hive Config: $k=$v")
+        }
+        initialConf.set(k, v)
+      }
+      val state = new SessionState(initialConf)
+      if (clientLoader.cachedHive != null) {
+        Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
+      }
+      SessionState.start(state)
+      state.out = new PrintStream(outputBuffer, true, "UTF-8")
+      state.err = new PrintStream(outputBuffer, true, "UTF-8")
+      state
+    } finally {
+      Thread.currentThread().setContextClassLoader(original)
+    }
+    ret
+  }
+
+  /** Returns the configuration for the current session. */
+  def conf: HiveConf = SessionState.get().getConf
+
+  override def getConf(key: String, defaultValue: String): String = {
+    conf.get(key, defaultValue)
+  }
+
+  // We use hive's conf for compatibility.
+  private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES)
+  private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf)
+
+  /**
+   * Runs `f` with multiple retries in case the hive metastore is temporarily unreachable.
+   */
+  private def retryLocked[A](f: => A): A = clientLoader.synchronized {
+    // Hive sometimes retries internally, so set a deadline to avoid compounding delays.
+    val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong
+    var numTries = 0
+    var caughtException: Exception = null
+    do {
+      numTries += 1
+      try {
+        return f
+      } catch {
+        case e: Exception if causedByThrift(e) =>
+          caughtException = e
+          logWarning(
+            "HiveClient got thrift exception, destroying client and retrying " +
+              s"(${retryLimit - numTries} tries remaining)", e)
+          clientLoader.cachedHive = null
+          Thread.sleep(retryDelayMillis)
+      }
+    } while (numTries <= retryLimit && System.nanoTime < deadline)
+    if (System.nanoTime > deadline) {
+      logWarning("Deadline exceeded")
+    }
+    throw caughtException
+  }
+
+  private def causedByThrift(e: Throwable): Boolean = {
+    var target = e
+    while (target != null) {
+      val msg = target.getMessage()
+      if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
+        return true
+      }
+      target = target.getCause()
+    }
+    false
+  }
+
+  def client: Hive = {
+    if (clientLoader.cachedHive != null) {
+      clientLoader.cachedHive.asInstanceOf[Hive]
+    } else {
+      val c = Hive.get(conf)
+      clientLoader.cachedHive = c
+      c
+    }
+  }
+
+  /**
+   * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
+   */
+  def withHiveState[A](f: => A): A = retryLocked {
+    val original = Thread.currentThread().getContextClassLoader
+    // Set the thread local metastore client to the client associated with this HiveClientImpl.
+    Hive.set(client)
+    // The classloader in clientLoader could be changed after addJar, always use the latest
+    // classloader
+    state.getConf.setClassLoader(clientLoader.classLoader)
+    // setCurrentSessionState will use the classLoader associated
+    // with the HiveConf in `state` to override the context class loader of the current
+    // thread.
+    shim.setCurrentSessionState(state)
+    val ret = try f finally {
+      Thread.currentThread().setContextClassLoader(original)
+    }
+    ret
+  }
+
+  def setOut(stream: PrintStream): Unit = withHiveState {
+    state.out = stream
+  }
+
+  def setInfo(stream: PrintStream): Unit = withHiveState {
+    state.info = stream
+  }
+
+  def setError(stream: PrintStream): Unit = withHiveState {
+    state.err = stream
+  }
+
+  override def currentDatabase: String = withHiveState {
+    state.getCurrentDatabase
+  }
+
+  override def setCurrentDatabase(databaseName: String): Unit = withHiveState {
+    if (getDatabaseOption(databaseName).isDefined) {
+      state.setCurrentDatabase(databaseName)
+    } else {
+      throw new NoSuchDatabaseException
+    }
+  }
+
+  override def createDatabase(database: HiveDatabase): Unit = withHiveState {
+    client.createDatabase(
+      new Database(
+        database.name,
+        "",
+        new File(database.location).toURI.toString,
+        new java.util.HashMap),
+        true)
+  }
+
+  override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState {
+    Option(client.getDatabase(name)).map { d =>
+      HiveDatabase(
+        name = d.getName,
+        location = d.getLocationUri)
+    }
+  }
+
+  override def getTableOption(
+      dbName: String,
+      tableName: String): Option[HiveTable] = withHiveState {
+
+    logDebug(s"Looking up $dbName.$tableName")
+
+    val hiveTable = Option(client.getTable(dbName, tableName, false))
+    val converted = hiveTable.map { h =>
+
+      HiveTable(
+        name = h.getTableName,
+        specifiedDatabase = Option(h.getDbName),
+        schema = h.getCols.asScala.map(f => HiveColumn(f.getName, f.getType, f.getComment)),
+        partitionColumns = h.getPartCols.asScala.map(f =>
+          HiveColumn(f.getName, f.getType, f.getComment)),
+        properties = h.getParameters.asScala.toMap,
+        serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap,
+        tableType = h.getTableType match {
+          case HTableType.MANAGED_TABLE => ManagedTable
+          case HTableType.EXTERNAL_TABLE => ExternalTable
+          case HTableType.VIRTUAL_VIEW => VirtualView
+          case HTableType.INDEX_TABLE => IndexTable
+        },
+        location = shim.getDataLocation(h),
+        inputFormat = Option(h.getInputFormatClass).map(_.getName),
+        outputFormat = Option(h.getOutputFormatClass).map(_.getName),
+        serde = Option(h.getSerializationLib),
+        viewText = Option(h.getViewExpandedText)).withClient(this)
+    }
+    converted
+  }
+
+  private def toInputFormat(name: String) =
+    Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
+
+  private def toOutputFormat(name: String) =
+    Utils.classForName(name)
+      .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
+
+  private def toQlTable(table: HiveTable): metadata.Table = {
+    val qlTable = new metadata.Table(table.database, table.name)
+
+    qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
+    qlTable.setPartCols(
+      table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
+    table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
+    table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }
+
+    // set owner
+    qlTable.setOwner(conf.getUser)
+    // set create time
+    qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
+
+    table.location.foreach { loc => shim.setDataLocation(qlTable, loc) }
+    table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass)
+    table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass)
+    table.serde.foreach(qlTable.setSerializationLib)
+
+    qlTable
+  }
+
+  private def toViewTable(view: HiveTable): metadata.Table = {
+    // TODO: this is duplicated with `toQlTable` except the table type stuff.
+    val tbl = new metadata.Table(view.database, view.name)
+    tbl.setTableType(HTableType.VIRTUAL_VIEW)
+    tbl.setSerializationLib(null)
+    tbl.clearSerDeInfo()
+
+    // TODO: we will save the same SQL string to original and expanded text, which is different
+    // from Hive.
+    tbl.setViewOriginalText(view.viewText.get)
+    tbl.setViewExpandedText(view.viewText.get)
+
+    tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
+    view.properties.foreach { case (k, v) => tbl.setProperty(k, v) }
+
+    // set owner
+    tbl.setOwner(conf.getUser)
+    // set create time
+    tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
+
+    tbl
+  }
+
+  override def createView(view: HiveTable): Unit = withHiveState {
+    client.createTable(toViewTable(view))
+  }
+
+  override def alertView(view: HiveTable): Unit = withHiveState {
+    client.alterTable(view.qualifiedName, toViewTable(view))
+  }
+
+  override def createTable(table: HiveTable): Unit = withHiveState {
+    val qlTable = toQlTable(table)
+    client.createTable(qlTable)
+  }
+
+  override def alterTable(table: HiveTable): Unit = withHiveState {
+    val qlTable = toQlTable(table)
+    client.alterTable(table.qualifiedName, qlTable)
+  }
+
+  private def toHivePartition(partition: metadata.Partition): HivePartition = {
+    val apiPartition = partition.getTPartition
+    HivePartition(
+      values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty),
+      storage = HiveStorageDescriptor(
+        location = apiPartition.getSd.getLocation,
+        inputFormat = apiPartition.getSd.getInputFormat,
+        outputFormat = apiPartition.getSd.getOutputFormat,
+        serde = apiPartition.getSd.getSerdeInfo.getSerializationLib,
+        serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap))
+  }
+
+  override def getPartitionOption(
+      table: HiveTable,
+      partitionSpec: JMap[String, String]): Option[HivePartition] = withHiveState {
+
+    val qlTable = toQlTable(table)
+    val qlPartition = client.getPartition(qlTable, partitionSpec, false)
+    Option(qlPartition).map(toHivePartition)
+  }
+
+  override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState {
+    val qlTable = toQlTable(hTable)
+    shim.getAllPartitions(client, qlTable).map(toHivePartition)
+  }
+
+  override def getPartitionsByFilter(
+      hTable: HiveTable,
+      predicates: Seq[Expression]): Seq[HivePartition] = withHiveState {
+    val qlTable = toQlTable(hTable)
+    shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition)
+  }
+
+  override def listTables(dbName: String): Seq[String] = withHiveState {
+    client.getAllTables(dbName).asScala
+  }
+
+  /**
+   * Runs the specified SQL query using Hive.
+   */
+  override def runSqlHive(sql: String): Seq[String] = {
+    val maxResults = 100000
+    val results = runHive(sql, maxResults)
+    // It is very confusing when you only get back some of the results...
+    if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED")
+    results
+  }
+
+  /**
+   * Execute the command using Hive and return the results as a sequence. Each element
+   * in the sequence is one row.
+   */
+  protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState {
+    logDebug(s"Running hiveql '$cmd'")
+    if (cmd.toLowerCase.startsWith("set")) { logDebug(s"Changing config: $cmd") }
+    try {
+      val cmd_trimmed: String = cmd.trim()
+      val tokens: Array[String] = cmd_trimmed.split("\\s+")
+      // The remainder of the command.
+      val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
+      val proc = shim.getCommandProcessor(tokens(0), conf)
+      proc match {
+        case driver: Driver =>
+          val response: CommandProcessorResponse = driver.run(cmd)
+          // Throw an exception if there is an error in query processing.
+          if (response.getResponseCode != 0) {
+            driver.close()
+            throw new QueryExecutionException(response.getErrorMessage)
+          }
+          driver.setMaxRows(maxRows)
+
+          val results = shim.getDriverResults(driver)
+          driver.close()
+          results
+
+        case _ =>
+          if (state.out != null) {
+            // scalastyle:off println
+            state.out.println(tokens(0) + " " + cmd_1)
+            // scalastyle:on println
+          }
+          Seq(proc.run(cmd_1).getResponseCode.toString)
+      }
+    } catch {
+      case e: Exception =>
+        logError(
+          s"""
+            |======================
+            |HIVE FAILURE OUTPUT
+            |======================
+            |${outputBuffer.toString}
+            |======================
+            |END HIVE FAILURE OUTPUT
+            |======================
+          """.stripMargin)
+        throw e
+    }
+  }
+
+  def loadPartition(
+      loadPath: String,
+      tableName: String,
+      partSpec: java.util.LinkedHashMap[String, String],
+      replace: Boolean,
+      holdDDLTime: Boolean,
+      inheritTableSpecs: Boolean,
+      isSkewedStoreAsSubdir: Boolean): Unit = withHiveState {
+    shim.loadPartition(
+      client,
+      new Path(loadPath), // TODO: Use URI
+      tableName,
+      partSpec,
+      replace,
+      holdDDLTime,
+      inheritTableSpecs,
+      isSkewedStoreAsSubdir)
+  }
+
+  def loadTable(
+      loadPath: String, // TODO URI
+      tableName: String,
+      replace: Boolean,
+      holdDDLTime: Boolean): Unit = withHiveState {
+    shim.loadTable(
+      client,
+      new Path(loadPath),
+      tableName,
+      replace,
+      holdDDLTime)
+  }
+
+  def loadDynamicPartitions(
+      loadPath: String,
+      tableName: String,
+      partSpec: java.util.LinkedHashMap[String, String],
+      replace: Boolean,
+      numDP: Int,
+      holdDDLTime: Boolean,
+      listBucketingEnabled: Boolean): Unit = withHiveState {
+    shim.loadDynamicPartitions(
+      client,
+      new Path(loadPath),
+      tableName,
+      partSpec,
+      replace,
+      numDP,
+      holdDDLTime,
+      listBucketingEnabled)
+  }
+
+  def addJar(path: String): Unit = {
+    val uri = new Path(path).toUri
+    val jarURL = if (uri.getScheme == null) {
+      // `path` is a local file path without a URL scheme
+      new File(path).toURI.toURL
+    } else {
+      // `path` is a URL with a scheme
+      uri.toURL
+    }
+    clientLoader.addJar(jarURL)
+    runSqlHive(s"ADD JAR $path")
+  }
+
+  def newSession(): HiveClientImpl = {
+    clientLoader.createClient().asInstanceOf[HiveClientImpl]
+  }
+
+  def reset(): Unit = withHiveState {
+    client.getAllTables("default").asScala.foreach { t =>
+        logDebug(s"Deleting table $t")
+        val table = client.getTable("default", t)
+        client.getIndexes("default", t, 255).asScala.foreach { index =>
+          shim.dropIndex(client, "default", t, index.getIndexName)
+        }
+        if (!table.isIndexTable) {
+          client.dropTable("default", t)
+        }
+      }
+      client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db =>
+        logDebug(s"Dropping Database: $db")
+        client.dropDatabase(db, true, false, true)
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index ca636b0..70c10be 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -38,8 +38,8 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types.{IntegralType, StringType}
 
 /**
- * A shim that defines the interface between ClientWrapper and the underlying Hive library used to
- * talk to the metastore. Each Hive version has its own implementation of this class, defining
+ * A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used
+ * to talk to the metastore. Each Hive version has its own implementation of this class, defining
  * version-specific version of needed functions.
  *
  * The guideline for writing shims is:
@@ -52,7 +52,6 @@ private[client] sealed abstract class Shim {
   /**
    * Set the current SessionState to the given SessionState. Also, set the context classloader of
    * the current thread to the one set in the HiveConf of this given `state`.
-   * @param state
    */
   def setCurrentSessionState(state: SessionState): Unit
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 010051d..dca7396 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -124,15 +124,15 @@ private[hive] object IsolatedClientLoader extends Logging {
 }
 
 /**
- * Creates a Hive `ClientInterface` using a classloader that works according to the following rules:
+ * Creates a [[HiveClient]] using a classloader that works according to the following rules:
  *  - Shared classes: Java, Scala, logging, and Spark classes are delegated to `baseClassLoader`
- *    allowing the results of calls to the `ClientInterface` to be visible externally.
+ *    allowing the results of calls to the [[HiveClient]] to be visible externally.
  *  - Hive classes: new instances are loaded from `execJars`.  These classes are not
  *    accessible externally due to their custom loading.
- *  - ClientWrapper: a new copy is created for each instance of `IsolatedClassLoader`.
+ *  - [[HiveClientImpl]]: a new copy is created for each instance of `IsolatedClassLoader`.
  *    This new instance is able to see a specific version of hive without using reflection. Since
  *    this is a unique instance, it is not visible externally other than as a generic
- *    `ClientInterface`, unless `isolationOn` is set to `false`.
+ *    [[HiveClient]], unless `isolationOn` is set to `false`.
  *
  * @param version The version of hive on the classpath.  used to pick specific function signatures
  *                that are not compatible across versions.
@@ -179,7 +179,7 @@ private[hive] class IsolatedClientLoader(
 
   /** True if `name` refers to a spark class that must see specific version of Hive. */
   protected def isBarrierClass(name: String): Boolean =
-    name.startsWith(classOf[ClientWrapper].getName) ||
+    name.startsWith(classOf[HiveClientImpl].getName) ||
     name.startsWith(classOf[Shim].getName) ||
     barrierPrefixes.exists(name.startsWith)
 
@@ -233,9 +233,9 @@ private[hive] class IsolatedClientLoader(
   }
 
   /** The isolated client interface to Hive. */
-  private[hive] def createClient(): ClientInterface = {
+  private[hive] def createClient(): HiveClient = {
     if (!isolationOn) {
-      return new ClientWrapper(version, config, baseClassLoader, this)
+      return new HiveClientImpl(version, config, baseClassLoader, this)
     }
     // Pre-reflective instantiation setup.
     logDebug("Initializing the logger to avoid disaster...")
@@ -244,10 +244,10 @@ private[hive] class IsolatedClientLoader(
 
     try {
       classLoader
-        .loadClass(classOf[ClientWrapper].getName)
+        .loadClass(classOf[HiveClientImpl].getName)
         .getConstructors.head
         .newInstance(version, config, classLoader, this)
-        .asInstanceOf[ClientInterface]
+        .asInstanceOf[HiveClient]
     } catch {
       case e: InvocationTargetException =>
         if (e.getCause().isInstanceOf[NoClassDefFoundError]) {

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 56cab1a..d5ed838 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -38,13 +38,13 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.util.sequenceOption
 import org.apache.spark.sql.hive.HiveShim._
-import org.apache.spark.sql.hive.client.ClientWrapper
+import org.apache.spark.sql.hive.client.HiveClientImpl
 import org.apache.spark.sql.types._
 
 
 private[hive] class HiveFunctionRegistry(
     underlying: analysis.FunctionRegistry,
-    executionHive: ClientWrapper)
+    executionHive: HiveClientImpl)
   extends analysis.FunctionRegistry with HiveInspectors {
 
   def getFunctionInfo(name: String): FunctionInfo = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index a33223a..246108e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.CacheTableCommand
 import org.apache.spark.sql.hive._
-import org.apache.spark.sql.hive.client.ClientWrapper
+import org.apache.spark.sql.hive.client.HiveClientImpl
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 
@@ -458,7 +458,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
     org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive)
 }
 
-private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: ClientWrapper)
+private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl)
   extends HiveFunctionRegistry(fr, client) {
 
   private val removedFunctions =

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index ff10a25..1344a2c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.tags.ExtendedHiveTest
 import org.apache.spark.util.Utils
 
 /**
- * A simple set of tests that call the methods of a hive ClientInterface, loading different version
+ * A simple set of tests that call the methods of a [[HiveClient]], loading different version
  * of hive from maven central.  These tests are simple in that they are mostly just testing to make
  * sure that reflective calls are not throwing NoSuchMethod error, but the actually functionality
  * is not fully tested.
@@ -101,7 +101,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
 
   private val versions = Seq("12", "13", "14", "1.0.0", "1.1.0", "1.2.0")
 
-  private var client: ClientInterface = null
+  private var client: HiveClient = null
 
   versions.foreach { version =>
     test(s"$version: create client") {

http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 0d62d79..1ada2e3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -199,7 +199,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       "Extended Usage")
 
     checkExistence(sql("describe functioN abcadf"), true,
-      "Function: abcadf is not found.")
+      "Function: abcadf not found.")
 
     checkExistence(sql("describe functioN  `~`"), true,
       "Function: ~",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org