You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/30 01:57:37 UTC
spark git commit: [SPARK-13076][SQL] Rename ClientInterface ->
HiveClient
Repository: spark
Updated Branches:
refs/heads/master e38b0baa3 -> 2cbc41282
[SPARK-13076][SQL] Rename ClientInterface -> HiveClient
And ClientWrapper -> HiveClientImpl.
I have some followup pull requests to introduce a new internal catalog, and I think this new naming reflects better the functionality of the two classes.
Author: Reynold Xin <rx...@databricks.com>
Closes #10981 from rxin/SPARK-13076.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2cbc4128
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2cbc4128
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2cbc4128
Branch: refs/heads/master
Commit: 2cbc412821641cf9446c0621ffa1976bd7fc4fa1
Parents: e38b0ba
Author: Reynold Xin <rx...@databricks.com>
Authored: Fri Jan 29 16:57:34 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Jan 29 16:57:34 2016 -0800
----------------------------------------------------------------------
.../apache/spark/sql/execution/commands.scala | 2 +-
.../org/apache/spark/sql/SQLQuerySuite.scala | 2 +-
.../org/apache/spark/sql/hive/HiveContext.scala | 10 +-
.../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +-
.../spark/sql/hive/client/ClientInterface.scala | 195 -------
.../spark/sql/hive/client/ClientWrapper.scala | 544 -------------------
.../spark/sql/hive/client/HiveClient.scala | 195 +++++++
.../spark/sql/hive/client/HiveClientImpl.scala | 544 +++++++++++++++++++
.../apache/spark/sql/hive/client/HiveShim.scala | 5 +-
.../sql/hive/client/IsolatedClientLoader.scala | 18 +-
.../org/apache/spark/sql/hive/hiveUDFs.scala | 4 +-
.../apache/spark/sql/hive/test/TestHive.scala | 4 +-
.../spark/sql/hive/client/VersionsSuite.scala | 4 +-
.../sql/hive/execution/SQLQuerySuite.scala | 2 +-
14 files changed, 765 insertions(+), 766 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 703e464..c6adb58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -404,7 +404,7 @@ case class DescribeFunction(
result
}
- case None => Seq(Row(s"Function: $functionName is not found."))
+ case None => Seq(Row(s"Function: $functionName not found."))
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 51a50c1..2b821c1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -84,7 +84,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
"Extended Usage")
checkExistence(sql("describe functioN abcadf"), true,
- "Function: abcadf is not found.")
+ "Function: abcadf not found.")
}
test("SPARK-6743: no columns from cache") {
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 1797ea5..05863ae 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -79,8 +79,8 @@ class HiveContext private[hive](
sc: SparkContext,
cacheManager: CacheManager,
listener: SQLListener,
- @transient private val execHive: ClientWrapper,
- @transient private val metaHive: ClientInterface,
+ @transient private val execHive: HiveClientImpl,
+ @transient private val metaHive: HiveClient,
isRootContext: Boolean)
extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging {
self =>
@@ -193,7 +193,7 @@ class HiveContext private[hive](
* for storing persistent metadata, and only point to a dummy metastore in a temporary directory.
*/
@transient
- protected[hive] lazy val executionHive: ClientWrapper = if (execHive != null) {
+ protected[hive] lazy val executionHive: HiveClientImpl = if (execHive != null) {
execHive
} else {
logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
@@ -203,7 +203,7 @@ class HiveContext private[hive](
config = newTemporaryConfiguration(useInMemoryDerby = true),
isolationOn = false,
baseClassLoader = Utils.getContextOrSparkClassLoader)
- loader.createClient().asInstanceOf[ClientWrapper]
+ loader.createClient().asInstanceOf[HiveClientImpl]
}
/**
@@ -222,7 +222,7 @@ class HiveContext private[hive](
* in the hive-site.xml file.
*/
@transient
- protected[hive] lazy val metadataHive: ClientInterface = if (metaHive != null) {
+ protected[hive] lazy val metadataHive: HiveClient = if (metaHive != null) {
metaHive
} else {
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 848aa4e..61d0d67 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -96,7 +96,7 @@ private[hive] object HiveSerDe {
}
}
-private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
+private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext)
extends Catalog with Logging {
val conf = hive.conf
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
deleted file mode 100644
index 4eec3fe..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.client
-
-import java.io.PrintStream
-import java.util.{Map => JMap}
-import javax.annotation.Nullable
-
-import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-private[hive] case class HiveDatabase(name: String, location: String)
-
-private[hive] abstract class TableType { val name: String }
-private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
-private[hive] case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
-private[hive] case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
-private[hive] case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }
-
-// TODO: Use this for Tables and Partitions
-private[hive] case class HiveStorageDescriptor(
- location: String,
- inputFormat: String,
- outputFormat: String,
- serde: String,
- serdeProperties: Map[String, String])
-
-private[hive] case class HivePartition(
- values: Seq[String],
- storage: HiveStorageDescriptor)
-
-private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String)
-private[hive] case class HiveTable(
- specifiedDatabase: Option[String],
- name: String,
- schema: Seq[HiveColumn],
- partitionColumns: Seq[HiveColumn],
- properties: Map[String, String],
- serdeProperties: Map[String, String],
- tableType: TableType,
- location: Option[String] = None,
- inputFormat: Option[String] = None,
- outputFormat: Option[String] = None,
- serde: Option[String] = None,
- viewText: Option[String] = None) {
-
- @transient
- private[client] var client: ClientInterface = _
-
- private[client] def withClient(ci: ClientInterface): this.type = {
- client = ci
- this
- }
-
- def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))
-
- def isPartitioned: Boolean = partitionColumns.nonEmpty
-
- def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
-
- def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
- client.getPartitionsByFilter(this, predicates)
-
- // Hive does not support backticks when passing names to the client.
- def qualifiedName: String = s"$database.$name"
-}
-
-/**
- * An externally visible interface to the Hive client. This interface is shared across both the
- * internal and external classloaders for a given version of Hive and thus must expose only
- * shared classes.
- */
-private[hive] trait ClientInterface {
-
- /** Returns the Hive Version of this client. */
- def version: HiveVersion
-
- /** Returns the configuration for the given key in the current session. */
- def getConf(key: String, defaultValue: String): String
-
- /**
- * Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will
- * result in one string.
- */
- def runSqlHive(sql: String): Seq[String]
-
- def setOut(stream: PrintStream): Unit
- def setInfo(stream: PrintStream): Unit
- def setError(stream: PrintStream): Unit
-
- /** Returns the names of all tables in the given database. */
- def listTables(dbName: String): Seq[String]
-
- /** Returns the name of the active database. */
- def currentDatabase: String
-
- /** Sets the name of current database. */
- def setCurrentDatabase(databaseName: String): Unit
-
- /** Returns the metadata for specified database, throwing an exception if it doesn't exist */
- def getDatabase(name: String): HiveDatabase = {
- getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
- }
-
- /** Returns the metadata for a given database, or None if it doesn't exist. */
- def getDatabaseOption(name: String): Option[HiveDatabase]
-
- /** Returns the specified table, or throws [[NoSuchTableException]]. */
- def getTable(dbName: String, tableName: String): HiveTable = {
- getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
- }
-
- /** Returns the metadata for the specified table or None if it doens't exist. */
- def getTableOption(dbName: String, tableName: String): Option[HiveTable]
-
- /** Creates a view with the given metadata. */
- def createView(view: HiveTable): Unit
-
- /** Updates the given view with new metadata. */
- def alertView(view: HiveTable): Unit
-
- /** Creates a table with the given metadata. */
- def createTable(table: HiveTable): Unit
-
- /** Updates the given table with new metadata. */
- def alterTable(table: HiveTable): Unit
-
- /** Creates a new database with the given name. */
- def createDatabase(database: HiveDatabase): Unit
-
- /** Returns the specified paritition or None if it does not exist. */
- def getPartitionOption(
- hTable: HiveTable,
- partitionSpec: JMap[String, String]): Option[HivePartition]
-
- /** Returns all partitions for the given table. */
- def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
-
- /** Returns partitions filtered by predicates for the given table. */
- def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition]
-
- /** Loads a static partition into an existing table. */
- def loadPartition(
- loadPath: String,
- tableName: String,
- partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
- replace: Boolean,
- holdDDLTime: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit
-
- /** Loads data into an existing table. */
- def loadTable(
- loadPath: String, // TODO URI
- tableName: String,
- replace: Boolean,
- holdDDLTime: Boolean): Unit
-
- /** Loads new dynamic partitions into an existing table. */
- def loadDynamicPartitions(
- loadPath: String,
- tableName: String,
- partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
- replace: Boolean,
- numDP: Int,
- holdDDLTime: Boolean,
- listBucketingEnabled: Boolean): Unit
-
- /** Add a jar into class loader */
- def addJar(path: String): Unit
-
- /** Return a ClientInterface as new session, that will share the class loader and Hive client */
- def newSession(): ClientInterface
-
- /** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */
- def withHiveState[A](f: => A): A
-
- /** Used for testing only. Removes all metadata from this instance of Hive. */
- def reset(): Unit
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
deleted file mode 100644
index 5307e92..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.client
-
-import java.io.{File, PrintStream}
-import java.util.{Map => JMap}
-
-import scala.collection.JavaConverters._
-import scala.language.reflectiveCalls
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.{TableType => HTableType}
-import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
-import org.apache.hadoop.hive.ql.{metadata, Driver}
-import org.apache.hadoop.hive.ql.metadata.Hive
-import org.apache.hadoop.hive.ql.processors._
-import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
-import org.apache.hadoop.security.UserGroupInformation
-
-import org.apache.spark.{Logging, SparkConf, SparkException}
-import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.util.{CircularBuffer, Utils}
-
-/**
- * A class that wraps the HiveClient and converts its responses to externally visible classes.
- * Note that this class is typically loaded with an internal classloader for each instantiation,
- * allowing it to interact directly with a specific isolated version of Hive. Loading this class
- * with the isolated classloader however will result in it only being visible as a ClientInterface,
- * not a ClientWrapper.
- *
- * This class needs to interact with multiple versions of Hive, but will always be compiled with
- * the 'native', execution version of Hive. Therefore, any places where hive breaks compatibility
- * must use reflection after matching on `version`.
- *
- * @param version the version of hive used when pick function calls that are not compatible.
- * @param config a collection of configuration options that will be added to the hive conf before
- * opening the hive client.
- * @param initClassLoader the classloader used when creating the `state` field of
- * this ClientWrapper.
- */
-private[hive] class ClientWrapper(
- override val version: HiveVersion,
- config: Map[String, String],
- initClassLoader: ClassLoader,
- val clientLoader: IsolatedClientLoader)
- extends ClientInterface
- with Logging {
-
- // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
- private val outputBuffer = new CircularBuffer()
-
- private val shim = version match {
- case hive.v12 => new Shim_v0_12()
- case hive.v13 => new Shim_v0_13()
- case hive.v14 => new Shim_v0_14()
- case hive.v1_0 => new Shim_v1_0()
- case hive.v1_1 => new Shim_v1_1()
- case hive.v1_2 => new Shim_v1_2()
- }
-
- // Create an internal session state for this ClientWrapper.
- val state = {
- val original = Thread.currentThread().getContextClassLoader
- // Switch to the initClassLoader.
- Thread.currentThread().setContextClassLoader(initClassLoader)
-
- // Set up kerberos credentials for UserGroupInformation.loginUser within
- // current class loader
- // Instead of using the spark conf of the current spark context, a new
- // instance of SparkConf is needed for the original value of spark.yarn.keytab
- // and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the
- // keytab configuration for the link name in distributed cache
- val sparkConf = new SparkConf
- if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
- val principalName = sparkConf.get("spark.yarn.principal")
- val keytabFileName = sparkConf.get("spark.yarn.keytab")
- if (!new File(keytabFileName).exists()) {
- throw new SparkException(s"Keytab file: ${keytabFileName}" +
- " specified in spark.yarn.keytab does not exist")
- } else {
- logInfo("Attempting to login to Kerberos" +
- s" using principal: ${principalName} and keytab: ${keytabFileName}")
- UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName)
- }
- }
-
- val ret = try {
- val initialConf = new HiveConf(classOf[SessionState])
- // HiveConf is a Hadoop Configuration, which has a field of classLoader and
- // the initial value will be the current thread's context class loader
- // (i.e. initClassLoader at here).
- // We call initialConf.setClassLoader(initClassLoader) at here to make
- // this action explicit.
- initialConf.setClassLoader(initClassLoader)
- config.foreach { case (k, v) =>
- if (k.toLowerCase.contains("password")) {
- logDebug(s"Hive Config: $k=xxx")
- } else {
- logDebug(s"Hive Config: $k=$v")
- }
- initialConf.set(k, v)
- }
- val state = new SessionState(initialConf)
- if (clientLoader.cachedHive != null) {
- Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
- }
- SessionState.start(state)
- state.out = new PrintStream(outputBuffer, true, "UTF-8")
- state.err = new PrintStream(outputBuffer, true, "UTF-8")
- state
- } finally {
- Thread.currentThread().setContextClassLoader(original)
- }
- ret
- }
-
- /** Returns the configuration for the current session. */
- def conf: HiveConf = SessionState.get().getConf
-
- override def getConf(key: String, defaultValue: String): String = {
- conf.get(key, defaultValue)
- }
-
- // We use hive's conf for compatibility.
- private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES)
- private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf)
-
- /**
- * Runs `f` with multiple retries in case the hive metastore is temporarily unreachable.
- */
- private def retryLocked[A](f: => A): A = clientLoader.synchronized {
- // Hive sometimes retries internally, so set a deadline to avoid compounding delays.
- val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong
- var numTries = 0
- var caughtException: Exception = null
- do {
- numTries += 1
- try {
- return f
- } catch {
- case e: Exception if causedByThrift(e) =>
- caughtException = e
- logWarning(
- "HiveClientWrapper got thrift exception, destroying client and retrying " +
- s"(${retryLimit - numTries} tries remaining)", e)
- clientLoader.cachedHive = null
- Thread.sleep(retryDelayMillis)
- }
- } while (numTries <= retryLimit && System.nanoTime < deadline)
- if (System.nanoTime > deadline) {
- logWarning("Deadline exceeded")
- }
- throw caughtException
- }
-
- private def causedByThrift(e: Throwable): Boolean = {
- var target = e
- while (target != null) {
- val msg = target.getMessage()
- if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
- return true
- }
- target = target.getCause()
- }
- false
- }
-
- def client: Hive = {
- if (clientLoader.cachedHive != null) {
- clientLoader.cachedHive.asInstanceOf[Hive]
- } else {
- val c = Hive.get(conf)
- clientLoader.cachedHive = c
- c
- }
- }
-
- /**
- * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
- */
- def withHiveState[A](f: => A): A = retryLocked {
- val original = Thread.currentThread().getContextClassLoader
- // Set the thread local metastore client to the client associated with this ClientWrapper.
- Hive.set(client)
- // The classloader in clientLoader could be changed after addJar, always use the latest
- // classloader
- state.getConf.setClassLoader(clientLoader.classLoader)
- // setCurrentSessionState will use the classLoader associated
- // with the HiveConf in `state` to override the context class loader of the current
- // thread.
- shim.setCurrentSessionState(state)
- val ret = try f finally {
- Thread.currentThread().setContextClassLoader(original)
- }
- ret
- }
-
- def setOut(stream: PrintStream): Unit = withHiveState {
- state.out = stream
- }
-
- def setInfo(stream: PrintStream): Unit = withHiveState {
- state.info = stream
- }
-
- def setError(stream: PrintStream): Unit = withHiveState {
- state.err = stream
- }
-
- override def currentDatabase: String = withHiveState {
- state.getCurrentDatabase
- }
-
- override def setCurrentDatabase(databaseName: String): Unit = withHiveState {
- if (getDatabaseOption(databaseName).isDefined) {
- state.setCurrentDatabase(databaseName)
- } else {
- throw new NoSuchDatabaseException
- }
- }
-
- override def createDatabase(database: HiveDatabase): Unit = withHiveState {
- client.createDatabase(
- new Database(
- database.name,
- "",
- new File(database.location).toURI.toString,
- new java.util.HashMap),
- true)
- }
-
- override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState {
- Option(client.getDatabase(name)).map { d =>
- HiveDatabase(
- name = d.getName,
- location = d.getLocationUri)
- }
- }
-
- override def getTableOption(
- dbName: String,
- tableName: String): Option[HiveTable] = withHiveState {
-
- logDebug(s"Looking up $dbName.$tableName")
-
- val hiveTable = Option(client.getTable(dbName, tableName, false))
- val converted = hiveTable.map { h =>
-
- HiveTable(
- name = h.getTableName,
- specifiedDatabase = Option(h.getDbName),
- schema = h.getCols.asScala.map(f => HiveColumn(f.getName, f.getType, f.getComment)),
- partitionColumns = h.getPartCols.asScala.map(f =>
- HiveColumn(f.getName, f.getType, f.getComment)),
- properties = h.getParameters.asScala.toMap,
- serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap,
- tableType = h.getTableType match {
- case HTableType.MANAGED_TABLE => ManagedTable
- case HTableType.EXTERNAL_TABLE => ExternalTable
- case HTableType.VIRTUAL_VIEW => VirtualView
- case HTableType.INDEX_TABLE => IndexTable
- },
- location = shim.getDataLocation(h),
- inputFormat = Option(h.getInputFormatClass).map(_.getName),
- outputFormat = Option(h.getOutputFormatClass).map(_.getName),
- serde = Option(h.getSerializationLib),
- viewText = Option(h.getViewExpandedText)).withClient(this)
- }
- converted
- }
-
- private def toInputFormat(name: String) =
- Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
-
- private def toOutputFormat(name: String) =
- Utils.classForName(name)
- .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
-
- private def toQlTable(table: HiveTable): metadata.Table = {
- val qlTable = new metadata.Table(table.database, table.name)
-
- qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
- qlTable.setPartCols(
- table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
- table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
- table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }
-
- // set owner
- qlTable.setOwner(conf.getUser)
- // set create time
- qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
-
- table.location.foreach { loc => shim.setDataLocation(qlTable, loc) }
- table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass)
- table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass)
- table.serde.foreach(qlTable.setSerializationLib)
-
- qlTable
- }
-
- private def toViewTable(view: HiveTable): metadata.Table = {
- // TODO: this is duplicated with `toQlTable` except the table type stuff.
- val tbl = new metadata.Table(view.database, view.name)
- tbl.setTableType(HTableType.VIRTUAL_VIEW)
- tbl.setSerializationLib(null)
- tbl.clearSerDeInfo()
-
- // TODO: we will save the same SQL string to original and expanded text, which is different
- // from Hive.
- tbl.setViewOriginalText(view.viewText.get)
- tbl.setViewExpandedText(view.viewText.get)
-
- tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
- view.properties.foreach { case (k, v) => tbl.setProperty(k, v) }
-
- // set owner
- tbl.setOwner(conf.getUser)
- // set create time
- tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
-
- tbl
- }
-
- override def createView(view: HiveTable): Unit = withHiveState {
- client.createTable(toViewTable(view))
- }
-
- override def alertView(view: HiveTable): Unit = withHiveState {
- client.alterTable(view.qualifiedName, toViewTable(view))
- }
-
- override def createTable(table: HiveTable): Unit = withHiveState {
- val qlTable = toQlTable(table)
- client.createTable(qlTable)
- }
-
- override def alterTable(table: HiveTable): Unit = withHiveState {
- val qlTable = toQlTable(table)
- client.alterTable(table.qualifiedName, qlTable)
- }
-
- private def toHivePartition(partition: metadata.Partition): HivePartition = {
- val apiPartition = partition.getTPartition
- HivePartition(
- values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty),
- storage = HiveStorageDescriptor(
- location = apiPartition.getSd.getLocation,
- inputFormat = apiPartition.getSd.getInputFormat,
- outputFormat = apiPartition.getSd.getOutputFormat,
- serde = apiPartition.getSd.getSerdeInfo.getSerializationLib,
- serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap))
- }
-
- override def getPartitionOption(
- table: HiveTable,
- partitionSpec: JMap[String, String]): Option[HivePartition] = withHiveState {
-
- val qlTable = toQlTable(table)
- val qlPartition = client.getPartition(qlTable, partitionSpec, false)
- Option(qlPartition).map(toHivePartition)
- }
-
- override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState {
- val qlTable = toQlTable(hTable)
- shim.getAllPartitions(client, qlTable).map(toHivePartition)
- }
-
- override def getPartitionsByFilter(
- hTable: HiveTable,
- predicates: Seq[Expression]): Seq[HivePartition] = withHiveState {
- val qlTable = toQlTable(hTable)
- shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition)
- }
-
- override def listTables(dbName: String): Seq[String] = withHiveState {
- client.getAllTables(dbName).asScala
- }
-
- /**
- * Runs the specified SQL query using Hive.
- */
- override def runSqlHive(sql: String): Seq[String] = {
- val maxResults = 100000
- val results = runHive(sql, maxResults)
- // It is very confusing when you only get back some of the results...
- if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED")
- results
- }
-
- /**
- * Execute the command using Hive and return the results as a sequence. Each element
- * in the sequence is one row.
- */
- protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState {
- logDebug(s"Running hiveql '$cmd'")
- if (cmd.toLowerCase.startsWith("set")) { logDebug(s"Changing config: $cmd") }
- try {
- val cmd_trimmed: String = cmd.trim()
- val tokens: Array[String] = cmd_trimmed.split("\\s+")
- // The remainder of the command.
- val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
- val proc = shim.getCommandProcessor(tokens(0), conf)
- proc match {
- case driver: Driver =>
- val response: CommandProcessorResponse = driver.run(cmd)
- // Throw an exception if there is an error in query processing.
- if (response.getResponseCode != 0) {
- driver.close()
- throw new QueryExecutionException(response.getErrorMessage)
- }
- driver.setMaxRows(maxRows)
-
- val results = shim.getDriverResults(driver)
- driver.close()
- results
-
- case _ =>
- if (state.out != null) {
- // scalastyle:off println
- state.out.println(tokens(0) + " " + cmd_1)
- // scalastyle:on println
- }
- Seq(proc.run(cmd_1).getResponseCode.toString)
- }
- } catch {
- case e: Exception =>
- logError(
- s"""
- |======================
- |HIVE FAILURE OUTPUT
- |======================
- |${outputBuffer.toString}
- |======================
- |END HIVE FAILURE OUTPUT
- |======================
- """.stripMargin)
- throw e
- }
- }
-
- def loadPartition(
- loadPath: String,
- tableName: String,
- partSpec: java.util.LinkedHashMap[String, String],
- replace: Boolean,
- holdDDLTime: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit = withHiveState {
- shim.loadPartition(
- client,
- new Path(loadPath), // TODO: Use URI
- tableName,
- partSpec,
- replace,
- holdDDLTime,
- inheritTableSpecs,
- isSkewedStoreAsSubdir)
- }
-
- def loadTable(
- loadPath: String, // TODO URI
- tableName: String,
- replace: Boolean,
- holdDDLTime: Boolean): Unit = withHiveState {
- shim.loadTable(
- client,
- new Path(loadPath),
- tableName,
- replace,
- holdDDLTime)
- }
-
- def loadDynamicPartitions(
- loadPath: String,
- tableName: String,
- partSpec: java.util.LinkedHashMap[String, String],
- replace: Boolean,
- numDP: Int,
- holdDDLTime: Boolean,
- listBucketingEnabled: Boolean): Unit = withHiveState {
- shim.loadDynamicPartitions(
- client,
- new Path(loadPath),
- tableName,
- partSpec,
- replace,
- numDP,
- holdDDLTime,
- listBucketingEnabled)
- }
-
- def addJar(path: String): Unit = {
- val uri = new Path(path).toUri
- val jarURL = if (uri.getScheme == null) {
- // `path` is a local file path without a URL scheme
- new File(path).toURI.toURL
- } else {
- // `path` is a URL with a scheme
- uri.toURL
- }
- clientLoader.addJar(jarURL)
- runSqlHive(s"ADD JAR $path")
- }
-
- def newSession(): ClientWrapper = {
- clientLoader.createClient().asInstanceOf[ClientWrapper]
- }
-
- def reset(): Unit = withHiveState {
- client.getAllTables("default").asScala.foreach { t =>
- logDebug(s"Deleting table $t")
- val table = client.getTable("default", t)
- client.getIndexes("default", t, 255).asScala.foreach { index =>
- shim.dropIndex(client, "default", t, index.getIndexName)
- }
- if (!table.isIndexTable) {
- client.dropTable("default", t)
- }
- }
- client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db =>
- logDebug(s"Dropping Database: $db")
- client.dropDatabase(db, true, false, true)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
new file mode 100644
index 0000000..f681cc6
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.client
+
+import java.io.PrintStream
+import java.util.{Map => JMap}
+import javax.annotation.Nullable
+
+import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+private[hive] case class HiveDatabase(name: String, location: String)
+
+private[hive] abstract class TableType { val name: String }
+private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
+private[hive] case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
+private[hive] case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
+private[hive] case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }
+
+// TODO: Use this for Tables and Partitions
+private[hive] case class HiveStorageDescriptor(
+ location: String,
+ inputFormat: String,
+ outputFormat: String,
+ serde: String,
+ serdeProperties: Map[String, String])
+
+private[hive] case class HivePartition(
+ values: Seq[String],
+ storage: HiveStorageDescriptor)
+
+private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String)
+private[hive] case class HiveTable(
+ specifiedDatabase: Option[String],
+ name: String,
+ schema: Seq[HiveColumn],
+ partitionColumns: Seq[HiveColumn],
+ properties: Map[String, String],
+ serdeProperties: Map[String, String],
+ tableType: TableType,
+ location: Option[String] = None,
+ inputFormat: Option[String] = None,
+ outputFormat: Option[String] = None,
+ serde: Option[String] = None,
+ viewText: Option[String] = None) {
+
+ @transient
+ private[client] var client: HiveClient = _
+
+ private[client] def withClient(ci: HiveClient): this.type = {
+ client = ci
+ this
+ }
+
+ def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))
+
+ def isPartitioned: Boolean = partitionColumns.nonEmpty
+
+ def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
+
+ def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
+ client.getPartitionsByFilter(this, predicates)
+
+ // Hive does not support backticks when passing names to the client.
+ def qualifiedName: String = s"$database.$name"
+}
+
+/**
+ * An externally visible interface to the Hive client. This interface is shared across both the
+ * internal and external classloaders for a given version of Hive and thus must expose only
+ * shared classes.
+ */
+private[hive] trait HiveClient {
+
+ /** Returns the Hive Version of this client. */
+ def version: HiveVersion
+
+ /** Returns the configuration for the given key in the current session. */
+ def getConf(key: String, defaultValue: String): String
+
+ /**
+ * Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will
+ * result in one string.
+ */
+ def runSqlHive(sql: String): Seq[String]
+
+ def setOut(stream: PrintStream): Unit
+ def setInfo(stream: PrintStream): Unit
+ def setError(stream: PrintStream): Unit
+
+ /** Returns the names of all tables in the given database. */
+ def listTables(dbName: String): Seq[String]
+
+ /** Returns the name of the active database. */
+ def currentDatabase: String
+
+ /** Sets the name of current database. */
+ def setCurrentDatabase(databaseName: String): Unit
+
+ /** Returns the metadata for specified database, throwing an exception if it doesn't exist */
+ def getDatabase(name: String): HiveDatabase = {
+ getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
+ }
+
+ /** Returns the metadata for a given database, or None if it doesn't exist. */
+ def getDatabaseOption(name: String): Option[HiveDatabase]
+
+ /** Returns the specified table, or throws [[NoSuchTableException]]. */
+ def getTable(dbName: String, tableName: String): HiveTable = {
+ getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
+ }
+
+ /** Returns the metadata for the specified table or None if it doens't exist. */
+ def getTableOption(dbName: String, tableName: String): Option[HiveTable]
+
+ /** Creates a view with the given metadata. */
+ def createView(view: HiveTable): Unit
+
+ /** Updates the given view with new metadata. */
+ def alertView(view: HiveTable): Unit
+
+ /** Creates a table with the given metadata. */
+ def createTable(table: HiveTable): Unit
+
+ /** Updates the given table with new metadata. */
+ def alterTable(table: HiveTable): Unit
+
+ /** Creates a new database with the given name. */
+ def createDatabase(database: HiveDatabase): Unit
+
+ /** Returns the specified paritition or None if it does not exist. */
+ def getPartitionOption(
+ hTable: HiveTable,
+ partitionSpec: JMap[String, String]): Option[HivePartition]
+
+ /** Returns all partitions for the given table. */
+ def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
+
+ /** Returns partitions filtered by predicates for the given table. */
+ def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition]
+
+ /** Loads a static partition into an existing table. */
+ def loadPartition(
+ loadPath: String,
+ tableName: String,
+ partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
+ replace: Boolean,
+ holdDDLTime: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean): Unit
+
+ /** Loads data into an existing table. */
+ def loadTable(
+ loadPath: String, // TODO URI
+ tableName: String,
+ replace: Boolean,
+ holdDDLTime: Boolean): Unit
+
+ /** Loads new dynamic partitions into an existing table. */
+ def loadDynamicPartitions(
+ loadPath: String,
+ tableName: String,
+ partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
+ replace: Boolean,
+ numDP: Int,
+ holdDDLTime: Boolean,
+ listBucketingEnabled: Boolean): Unit
+
+ /** Add a jar into class loader */
+ def addJar(path: String): Unit
+
+ /** Return a [[HiveClient]] as new session, that will share the class loader and Hive client */
+ def newSession(): HiveClient
+
+ /** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */
+ def withHiveState[A](f: => A): A
+
+ /** Used for testing only. Removes all metadata from this instance of Hive. */
+ def reset(): Unit
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
new file mode 100644
index 0000000..cf1ff55
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.client
+
+import java.io.{File, PrintStream}
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.language.reflectiveCalls
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.{TableType => HTableType}
+import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
+import org.apache.hadoop.hive.ql.{metadata, Driver}
+import org.apache.hadoop.hive.ql.metadata.Hive
+import org.apache.hadoop.hive.ql.processors._
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.util.{CircularBuffer, Utils}
+
+/**
+ * A class that wraps the HiveClient and converts its responses to externally visible classes.
+ * Note that this class is typically loaded with an internal classloader for each instantiation,
+ * allowing it to interact directly with a specific isolated version of Hive. Loading this class
+ * with the isolated classloader however will result in it only being visible as a [[HiveClient]],
+ * not a [[HiveClientImpl]].
+ *
+ * This class needs to interact with multiple versions of Hive, but will always be compiled with
+ * the 'native', execution version of Hive. Therefore, any places where hive breaks compatibility
+ * must use reflection after matching on `version`.
+ *
+ * @param version the version of hive used when pick function calls that are not compatible.
+ * @param config a collection of configuration options that will be added to the hive conf before
+ * opening the hive client.
+ * @param initClassLoader the classloader used when creating the `state` field of
+ * this [[HiveClientImpl]].
+ */
+private[hive] class HiveClientImpl(
+ override val version: HiveVersion,
+ config: Map[String, String],
+ initClassLoader: ClassLoader,
+ val clientLoader: IsolatedClientLoader)
+ extends HiveClient
+ with Logging {
+
+ // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
+ private val outputBuffer = new CircularBuffer()
+
+ private val shim = version match {
+ case hive.v12 => new Shim_v0_12()
+ case hive.v13 => new Shim_v0_13()
+ case hive.v14 => new Shim_v0_14()
+ case hive.v1_0 => new Shim_v1_0()
+ case hive.v1_1 => new Shim_v1_1()
+ case hive.v1_2 => new Shim_v1_2()
+ }
+
+ // Create an internal session state for this HiveClientImpl.
+ val state = {
+ val original = Thread.currentThread().getContextClassLoader
+ // Switch to the initClassLoader.
+ Thread.currentThread().setContextClassLoader(initClassLoader)
+
+ // Set up kerberos credentials for UserGroupInformation.loginUser within
+ // current class loader
+ // Instead of using the spark conf of the current spark context, a new
+ // instance of SparkConf is needed for the original value of spark.yarn.keytab
+ // and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the
+ // keytab configuration for the link name in distributed cache
+ val sparkConf = new SparkConf
+ if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
+ val principalName = sparkConf.get("spark.yarn.principal")
+ val keytabFileName = sparkConf.get("spark.yarn.keytab")
+ if (!new File(keytabFileName).exists()) {
+ throw new SparkException(s"Keytab file: ${keytabFileName}" +
+ " specified in spark.yarn.keytab does not exist")
+ } else {
+ logInfo("Attempting to login to Kerberos" +
+ s" using principal: ${principalName} and keytab: ${keytabFileName}")
+ UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName)
+ }
+ }
+
+ val ret = try {
+ val initialConf = new HiveConf(classOf[SessionState])
+ // HiveConf is a Hadoop Configuration, which has a field of classLoader and
+ // the initial value will be the current thread's context class loader
+ // (i.e. initClassLoader at here).
+ // We call initialConf.setClassLoader(initClassLoader) at here to make
+ // this action explicit.
+ initialConf.setClassLoader(initClassLoader)
+ config.foreach { case (k, v) =>
+ if (k.toLowerCase.contains("password")) {
+ logDebug(s"Hive Config: $k=xxx")
+ } else {
+ logDebug(s"Hive Config: $k=$v")
+ }
+ initialConf.set(k, v)
+ }
+ val state = new SessionState(initialConf)
+ if (clientLoader.cachedHive != null) {
+ Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
+ }
+ SessionState.start(state)
+ state.out = new PrintStream(outputBuffer, true, "UTF-8")
+ state.err = new PrintStream(outputBuffer, true, "UTF-8")
+ state
+ } finally {
+ Thread.currentThread().setContextClassLoader(original)
+ }
+ ret
+ }
+
+ /** Returns the configuration for the current session. */
+ def conf: HiveConf = SessionState.get().getConf
+
+ override def getConf(key: String, defaultValue: String): String = {
+ conf.get(key, defaultValue)
+ }
+
+ // We use hive's conf for compatibility.
+ private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES)
+ private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf)
+
+ /**
+ * Runs `f` with multiple retries in case the hive metastore is temporarily unreachable.
+ */
+ private def retryLocked[A](f: => A): A = clientLoader.synchronized {
+ // Hive sometimes retries internally, so set a deadline to avoid compounding delays.
+ val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong
+ var numTries = 0
+ var caughtException: Exception = null
+ do {
+ numTries += 1
+ try {
+ return f
+ } catch {
+ case e: Exception if causedByThrift(e) =>
+ caughtException = e
+ logWarning(
+ "HiveClient got thrift exception, destroying client and retrying " +
+ s"(${retryLimit - numTries} tries remaining)", e)
+ clientLoader.cachedHive = null
+ Thread.sleep(retryDelayMillis)
+ }
+ } while (numTries <= retryLimit && System.nanoTime < deadline)
+ if (System.nanoTime > deadline) {
+ logWarning("Deadline exceeded")
+ }
+ throw caughtException
+ }
+
+ private def causedByThrift(e: Throwable): Boolean = {
+ var target = e
+ while (target != null) {
+ val msg = target.getMessage()
+ if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
+ return true
+ }
+ target = target.getCause()
+ }
+ false
+ }
+
+ def client: Hive = {
+ if (clientLoader.cachedHive != null) {
+ clientLoader.cachedHive.asInstanceOf[Hive]
+ } else {
+ val c = Hive.get(conf)
+ clientLoader.cachedHive = c
+ c
+ }
+ }
+
+ /**
+ * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
+ */
+ def withHiveState[A](f: => A): A = retryLocked {
+ val original = Thread.currentThread().getContextClassLoader
+ // Set the thread local metastore client to the client associated with this HiveClientImpl.
+ Hive.set(client)
+ // The classloader in clientLoader could be changed after addJar, always use the latest
+ // classloader
+ state.getConf.setClassLoader(clientLoader.classLoader)
+ // setCurrentSessionState will use the classLoader associated
+ // with the HiveConf in `state` to override the context class loader of the current
+ // thread.
+ shim.setCurrentSessionState(state)
+ val ret = try f finally {
+ Thread.currentThread().setContextClassLoader(original)
+ }
+ ret
+ }
+
+ def setOut(stream: PrintStream): Unit = withHiveState {
+ state.out = stream
+ }
+
+ def setInfo(stream: PrintStream): Unit = withHiveState {
+ state.info = stream
+ }
+
+ def setError(stream: PrintStream): Unit = withHiveState {
+ state.err = stream
+ }
+
+ override def currentDatabase: String = withHiveState {
+ state.getCurrentDatabase
+ }
+
+ override def setCurrentDatabase(databaseName: String): Unit = withHiveState {
+ if (getDatabaseOption(databaseName).isDefined) {
+ state.setCurrentDatabase(databaseName)
+ } else {
+ throw new NoSuchDatabaseException
+ }
+ }
+
+ override def createDatabase(database: HiveDatabase): Unit = withHiveState {
+ client.createDatabase(
+ new Database(
+ database.name,
+ "",
+ new File(database.location).toURI.toString,
+ new java.util.HashMap),
+ true)
+ }
+
+ override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState {
+ Option(client.getDatabase(name)).map { d =>
+ HiveDatabase(
+ name = d.getName,
+ location = d.getLocationUri)
+ }
+ }
+
+ override def getTableOption(
+ dbName: String,
+ tableName: String): Option[HiveTable] = withHiveState {
+
+ logDebug(s"Looking up $dbName.$tableName")
+
+ val hiveTable = Option(client.getTable(dbName, tableName, false))
+ val converted = hiveTable.map { h =>
+
+ HiveTable(
+ name = h.getTableName,
+ specifiedDatabase = Option(h.getDbName),
+ schema = h.getCols.asScala.map(f => HiveColumn(f.getName, f.getType, f.getComment)),
+ partitionColumns = h.getPartCols.asScala.map(f =>
+ HiveColumn(f.getName, f.getType, f.getComment)),
+ properties = h.getParameters.asScala.toMap,
+ serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap,
+ tableType = h.getTableType match {
+ case HTableType.MANAGED_TABLE => ManagedTable
+ case HTableType.EXTERNAL_TABLE => ExternalTable
+ case HTableType.VIRTUAL_VIEW => VirtualView
+ case HTableType.INDEX_TABLE => IndexTable
+ },
+ location = shim.getDataLocation(h),
+ inputFormat = Option(h.getInputFormatClass).map(_.getName),
+ outputFormat = Option(h.getOutputFormatClass).map(_.getName),
+ serde = Option(h.getSerializationLib),
+ viewText = Option(h.getViewExpandedText)).withClient(this)
+ }
+ converted
+ }
+
+ private def toInputFormat(name: String) =
+ Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
+
+ private def toOutputFormat(name: String) =
+ Utils.classForName(name)
+ .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
+
+ private def toQlTable(table: HiveTable): metadata.Table = {
+ val qlTable = new metadata.Table(table.database, table.name)
+
+ qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
+ qlTable.setPartCols(
+ table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
+ table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
+ table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }
+
+ // set owner
+ qlTable.setOwner(conf.getUser)
+ // set create time
+ qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
+
+ table.location.foreach { loc => shim.setDataLocation(qlTable, loc) }
+ table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass)
+ table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass)
+ table.serde.foreach(qlTable.setSerializationLib)
+
+ qlTable
+ }
+
+ private def toViewTable(view: HiveTable): metadata.Table = {
+ // TODO: this is duplicated with `toQlTable` except the table type stuff.
+ val tbl = new metadata.Table(view.database, view.name)
+ tbl.setTableType(HTableType.VIRTUAL_VIEW)
+ tbl.setSerializationLib(null)
+ tbl.clearSerDeInfo()
+
+ // TODO: we will save the same SQL string to original and expanded text, which is different
+ // from Hive.
+ tbl.setViewOriginalText(view.viewText.get)
+ tbl.setViewExpandedText(view.viewText.get)
+
+ tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
+ view.properties.foreach { case (k, v) => tbl.setProperty(k, v) }
+
+ // set owner
+ tbl.setOwner(conf.getUser)
+ // set create time
+ tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
+
+ tbl
+ }
+
+ override def createView(view: HiveTable): Unit = withHiveState {
+ client.createTable(toViewTable(view))
+ }
+
+ override def alertView(view: HiveTable): Unit = withHiveState {
+ client.alterTable(view.qualifiedName, toViewTable(view))
+ }
+
+ override def createTable(table: HiveTable): Unit = withHiveState {
+ val qlTable = toQlTable(table)
+ client.createTable(qlTable)
+ }
+
+ override def alterTable(table: HiveTable): Unit = withHiveState {
+ val qlTable = toQlTable(table)
+ client.alterTable(table.qualifiedName, qlTable)
+ }
+
+ private def toHivePartition(partition: metadata.Partition): HivePartition = {
+ val apiPartition = partition.getTPartition
+ HivePartition(
+ values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty),
+ storage = HiveStorageDescriptor(
+ location = apiPartition.getSd.getLocation,
+ inputFormat = apiPartition.getSd.getInputFormat,
+ outputFormat = apiPartition.getSd.getOutputFormat,
+ serde = apiPartition.getSd.getSerdeInfo.getSerializationLib,
+ serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap))
+ }
+
+ override def getPartitionOption(
+ table: HiveTable,
+ partitionSpec: JMap[String, String]): Option[HivePartition] = withHiveState {
+
+ val qlTable = toQlTable(table)
+ val qlPartition = client.getPartition(qlTable, partitionSpec, false)
+ Option(qlPartition).map(toHivePartition)
+ }
+
+ override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState {
+ val qlTable = toQlTable(hTable)
+ shim.getAllPartitions(client, qlTable).map(toHivePartition)
+ }
+
+ override def getPartitionsByFilter(
+ hTable: HiveTable,
+ predicates: Seq[Expression]): Seq[HivePartition] = withHiveState {
+ val qlTable = toQlTable(hTable)
+ shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition)
+ }
+
+ override def listTables(dbName: String): Seq[String] = withHiveState {
+ client.getAllTables(dbName).asScala
+ }
+
+ /**
+ * Runs the specified SQL query using Hive.
+ */
+ override def runSqlHive(sql: String): Seq[String] = {
+ val maxResults = 100000
+ val results = runHive(sql, maxResults)
+ // It is very confusing when you only get back some of the results...
+ if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED")
+ results
+ }
+
+ /**
+ * Execute the command using Hive and return the results as a sequence. Each element
+ * in the sequence is one row.
+ */
+ protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState {
+ logDebug(s"Running hiveql '$cmd'")
+ if (cmd.toLowerCase.startsWith("set")) { logDebug(s"Changing config: $cmd") }
+ try {
+ val cmd_trimmed: String = cmd.trim()
+ val tokens: Array[String] = cmd_trimmed.split("\\s+")
+ // The remainder of the command.
+ val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
+ val proc = shim.getCommandProcessor(tokens(0), conf)
+ proc match {
+ case driver: Driver =>
+ val response: CommandProcessorResponse = driver.run(cmd)
+ // Throw an exception if there is an error in query processing.
+ if (response.getResponseCode != 0) {
+ driver.close()
+ throw new QueryExecutionException(response.getErrorMessage)
+ }
+ driver.setMaxRows(maxRows)
+
+ val results = shim.getDriverResults(driver)
+ driver.close()
+ results
+
+ case _ =>
+ if (state.out != null) {
+ // scalastyle:off println
+ state.out.println(tokens(0) + " " + cmd_1)
+ // scalastyle:on println
+ }
+ Seq(proc.run(cmd_1).getResponseCode.toString)
+ }
+ } catch {
+ case e: Exception =>
+ logError(
+ s"""
+ |======================
+ |HIVE FAILURE OUTPUT
+ |======================
+ |${outputBuffer.toString}
+ |======================
+ |END HIVE FAILURE OUTPUT
+ |======================
+ """.stripMargin)
+ throw e
+ }
+ }
+
+ def loadPartition(
+ loadPath: String,
+ tableName: String,
+ partSpec: java.util.LinkedHashMap[String, String],
+ replace: Boolean,
+ holdDDLTime: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean): Unit = withHiveState {
+ shim.loadPartition(
+ client,
+ new Path(loadPath), // TODO: Use URI
+ tableName,
+ partSpec,
+ replace,
+ holdDDLTime,
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir)
+ }
+
+ def loadTable(
+ loadPath: String, // TODO URI
+ tableName: String,
+ replace: Boolean,
+ holdDDLTime: Boolean): Unit = withHiveState {
+ shim.loadTable(
+ client,
+ new Path(loadPath),
+ tableName,
+ replace,
+ holdDDLTime)
+ }
+
+ def loadDynamicPartitions(
+ loadPath: String,
+ tableName: String,
+ partSpec: java.util.LinkedHashMap[String, String],
+ replace: Boolean,
+ numDP: Int,
+ holdDDLTime: Boolean,
+ listBucketingEnabled: Boolean): Unit = withHiveState {
+ shim.loadDynamicPartitions(
+ client,
+ new Path(loadPath),
+ tableName,
+ partSpec,
+ replace,
+ numDP,
+ holdDDLTime,
+ listBucketingEnabled)
+ }
+
+ def addJar(path: String): Unit = {
+ val uri = new Path(path).toUri
+ val jarURL = if (uri.getScheme == null) {
+ // `path` is a local file path without a URL scheme
+ new File(path).toURI.toURL
+ } else {
+ // `path` is a URL with a scheme
+ uri.toURL
+ }
+ clientLoader.addJar(jarURL)
+ runSqlHive(s"ADD JAR $path")
+ }
+
+ def newSession(): HiveClientImpl = {
+ clientLoader.createClient().asInstanceOf[HiveClientImpl]
+ }
+
+ def reset(): Unit = withHiveState {
+ client.getAllTables("default").asScala.foreach { t =>
+ logDebug(s"Deleting table $t")
+ val table = client.getTable("default", t)
+ client.getIndexes("default", t, 255).asScala.foreach { index =>
+ shim.dropIndex(client, "default", t, index.getIndexName)
+ }
+ if (!table.isIndexTable) {
+ client.dropTable("default", t)
+ }
+ }
+ client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db =>
+ logDebug(s"Dropping Database: $db")
+ client.dropDatabase(db, true, false, true)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index ca636b0..70c10be 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -38,8 +38,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{IntegralType, StringType}
/**
- * A shim that defines the interface between ClientWrapper and the underlying Hive library used to
- * talk to the metastore. Each Hive version has its own implementation of this class, defining
+ * A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used
+ * to talk to the metastore. Each Hive version has its own implementation of this class, defining
* version-specific version of needed functions.
*
* The guideline for writing shims is:
@@ -52,7 +52,6 @@ private[client] sealed abstract class Shim {
/**
* Set the current SessionState to the given SessionState. Also, set the context classloader of
* the current thread to the one set in the HiveConf of this given `state`.
- * @param state
*/
def setCurrentSessionState(state: SessionState): Unit
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 010051d..dca7396 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -124,15 +124,15 @@ private[hive] object IsolatedClientLoader extends Logging {
}
/**
- * Creates a Hive `ClientInterface` using a classloader that works according to the following rules:
+ * Creates a [[HiveClient]] using a classloader that works according to the following rules:
* - Shared classes: Java, Scala, logging, and Spark classes are delegated to `baseClassLoader`
- * allowing the results of calls to the `ClientInterface` to be visible externally.
+ * allowing the results of calls to the [[HiveClient]] to be visible externally.
* - Hive classes: new instances are loaded from `execJars`. These classes are not
* accessible externally due to their custom loading.
- * - ClientWrapper: a new copy is created for each instance of `IsolatedClassLoader`.
+ * - [[HiveClientImpl]]: a new copy is created for each instance of `IsolatedClassLoader`.
* This new instance is able to see a specific version of hive without using reflection. Since
* this is a unique instance, it is not visible externally other than as a generic
- * `ClientInterface`, unless `isolationOn` is set to `false`.
+ * [[HiveClient]], unless `isolationOn` is set to `false`.
*
* @param version The version of hive on the classpath. used to pick specific function signatures
* that are not compatible across versions.
@@ -179,7 +179,7 @@ private[hive] class IsolatedClientLoader(
/** True if `name` refers to a spark class that must see specific version of Hive. */
protected def isBarrierClass(name: String): Boolean =
- name.startsWith(classOf[ClientWrapper].getName) ||
+ name.startsWith(classOf[HiveClientImpl].getName) ||
name.startsWith(classOf[Shim].getName) ||
barrierPrefixes.exists(name.startsWith)
@@ -233,9 +233,9 @@ private[hive] class IsolatedClientLoader(
}
/** The isolated client interface to Hive. */
- private[hive] def createClient(): ClientInterface = {
+ private[hive] def createClient(): HiveClient = {
if (!isolationOn) {
- return new ClientWrapper(version, config, baseClassLoader, this)
+ return new HiveClientImpl(version, config, baseClassLoader, this)
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
@@ -244,10 +244,10 @@ private[hive] class IsolatedClientLoader(
try {
classLoader
- .loadClass(classOf[ClientWrapper].getName)
+ .loadClass(classOf[HiveClientImpl].getName)
.getConstructors.head
.newInstance(version, config, classLoader, this)
- .asInstanceOf[ClientInterface]
+ .asInstanceOf[HiveClient]
} catch {
case e: InvocationTargetException =>
if (e.getCause().isInstanceOf[NoClassDefFoundError]) {
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 56cab1a..d5ed838 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -38,13 +38,13 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.util.sequenceOption
import org.apache.spark.sql.hive.HiveShim._
-import org.apache.spark.sql.hive.client.ClientWrapper
+import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.types._
private[hive] class HiveFunctionRegistry(
underlying: analysis.FunctionRegistry,
- executionHive: ClientWrapper)
+ executionHive: HiveClientImpl)
extends analysis.FunctionRegistry with HiveInspectors {
def getFunctionInfo(name: String): FunctionInfo = {
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index a33223a..246108e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.CacheTableCommand
import org.apache.spark.sql.hive._
-import org.apache.spark.sql.hive.client.ClientWrapper
+import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.util.{ShutdownHookManager, Utils}
@@ -458,7 +458,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive)
}
-private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: ClientWrapper)
+private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl)
extends HiveFunctionRegistry(fr, client) {
private val removedFunctions =
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index ff10a25..1344a2c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.Utils
/**
- * A simple set of tests that call the methods of a hive ClientInterface, loading different version
+ * A simple set of tests that call the methods of a [[HiveClient]], loading different version
* of hive from maven central. These tests are simple in that they are mostly just testing to make
* sure that reflective calls are not throwing NoSuchMethod error, but the actually functionality
* is not fully tested.
@@ -101,7 +101,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
private val versions = Seq("12", "13", "14", "1.0.0", "1.1.0", "1.2.0")
- private var client: ClientInterface = null
+ private var client: HiveClient = null
versions.foreach { version =>
test(s"$version: create client") {
http://git-wip-us.apache.org/repos/asf/spark/blob/2cbc4128/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 0d62d79..1ada2e3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -199,7 +199,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
"Extended Usage")
checkExistence(sql("describe functioN abcadf"), true,
- "Function: abcadf is not found.")
+ "Function: abcadf not found.")
checkExistence(sql("describe functioN `~`"), true,
"Function: ~",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org