You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/22 02:58:02 UTC
spark git commit: [SPARK-14824][SQL] Rename HiveContext object to
HiveUtils
Repository: spark
Updated Branches:
refs/heads/master 0bf8df250 -> df1953f0d
[SPARK-14824][SQL] Rename HiveContext object to HiveUtils
## What changes were proposed in this pull request?
Just a rename so we can get rid of `HiveContext.scala`. Note that this will conflict with #12585.
## How was this patch tested?
No change in functionality.
Author: Andrew Or <an...@databricks.com>
Closes #12586 from andrewor14/rename-hc-object.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df1953f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df1953f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df1953f0
Branch: refs/heads/master
Commit: df1953f0df8b43136157a18bea05fd6750906f68
Parents: 0bf8df2
Author: Andrew Or <an...@databricks.com>
Authored: Thu Apr 21 17:57:59 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Apr 21 17:57:59 2016 -0700
----------------------------------------------------------------------
.../SparkExecuteStatementOperation.scala | 4 +-
.../hive/thriftserver/SparkSQLCLIDriver.scala | 4 +-
.../sql/hive/thriftserver/SparkSQLEnv.scala | 4 +-
.../thriftserver/SparkSQLSessionManager.scala | 4 +-
.../thriftserver/HiveThriftServer2Suites.scala | 6 +-
.../hive/execution/HiveCompatibilitySuite.scala | 6 +-
.../org/apache/spark/sql/hive/HiveContext.scala | 496 -------------------
.../spark/sql/hive/HiveQueryExecution.scala | 2 +-
.../spark/sql/hive/HiveSessionState.scala | 10 +-
.../apache/spark/sql/hive/HiveSharedState.scala | 4 +-
.../org/apache/spark/sql/hive/HiveUtils.scala | 496 +++++++++++++++++++
.../org/apache/spark/sql/hive/TableReader.scala | 6 +-
.../sql/hive/client/IsolatedClientLoader.scala | 4 +-
.../apache/spark/sql/hive/test/TestHive.scala | 8 +-
.../sql/hive/HiveExternalCatalogSuite.scala | 2 +-
.../sql/hive/MetastoreDataSourcesSuite.scala | 2 +-
.../hive/ParquetHiveCompatibilitySuite.scala | 2 +-
.../spark/sql/hive/client/VersionsSuite.scala | 6 +-
.../sql/hive/execution/SQLQuerySuite.scala | 14 +-
.../spark/sql/hive/orc/OrcQuerySuite.scala | 4 +-
.../apache/spark/sql/hive/parquetSuites.scala | 6 +-
21 files changed, 545 insertions(+), 545 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index d89c3b4..3025660 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -35,7 +35,7 @@ import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row => SparkRow}
import org.apache.spark.sql.execution.command.SetCommand
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.{Utils => SparkUtils}
@@ -98,7 +98,7 @@ private[hive] class SparkExecuteStatementOperation(
case TimestampType =>
to += from.getAs[Timestamp](ordinal)
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
- val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
+ val hiveString = HiveUtils.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to += hiveString
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 5769328..057fbbe 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -39,7 +39,7 @@ import org.apache.thrift.transport.TSocket
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.util.ShutdownHookManager
/**
@@ -82,7 +82,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
val cliConf = new HiveConf(classOf[SessionState])
// Override the location of the metastore since this is only used for local execution.
- HiveContext.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
+ HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
case (key, value) => cliConf.set(key, value)
}
val sessionState = new CliSessionState(cliConf)
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 2679ac1..465457f 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.StatsReportListener
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.{HiveContext, HiveUtils}
import org.apache.spark.util.Utils
/** A singleton object for the master program. The slaves should not access this. */
@@ -62,7 +62,7 @@ private[hive] object SparkSQLEnv extends Logging {
hiveContext.sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
hiveContext.sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
- hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
+ hiveContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
if (log.isDebugEnabled) {
hiveContext.sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index f492b56..a0beffd 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -27,7 +27,7 @@ import org.apache.hive.service.cli.session.SessionManager
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.hive.service.server.HiveServer2
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.{HiveContext, HiveUtils}
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
@@ -76,7 +76,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
} else {
hiveContext.newSession()
}
- ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
+ ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
sessionHandle
}
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index ee14b6d..bc45334 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -42,7 +42,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -115,7 +115,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
resultSet.next()
assert(resultSet.getString(1) === "spark.sql.hive.version")
- assert(resultSet.getString(2) === HiveContext.hiveExecutionVersion)
+ assert(resultSet.getString(2) === HiveUtils.hiveExecutionVersion)
}
}
@@ -624,7 +624,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
resultSet.next()
assert(resultSet.getString(1) === "spark.sql.hive.version")
- assert(resultSet.getString(2) === HiveContext.hiveExecutionVersion)
+ assert(resultSet.getString(2) === HiveUtils.hiveExecutionVersion)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 49fd198..b7d6c26 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -23,7 +23,7 @@ import java.util.{Locale, TimeZone}
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.internal.SQLConf
@@ -60,7 +60,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
TestHive.sessionState.functionRegistry.unregisterFunction("hash")
// Ensures that the plans generation use metastore relation and not OrcRelation
// Was done because SqlBuilder does not work with plans having logical relation
- TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, false)
+ TestHive.setConf(HiveUtils.CONVERT_METASTORE_ORC, false)
RuleExecutor.resetTime()
}
@@ -71,7 +71,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
Locale.setDefault(originalLocale)
TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
- TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, originalConvertMetastoreOrc)
+ TestHive.setConf(HiveUtils.CONVERT_METASTORE_ORC, originalConvertMetastoreOrc)
TestHive.sessionState.functionRegistry.restore()
// For debugging dump some statistics about how much time was spent in various optimizer rules.
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
deleted file mode 100644
index b2ce3e0..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ /dev/null
@@ -1,496 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.File
-import java.net.{URL, URLClassLoader}
-import java.nio.charset.StandardCharsets
-import java.sql.Timestamp
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.HashMap
-import scala.language.implicitConversions
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
-import org.apache.hadoop.util.VersionInfo
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql._
-import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf._
-import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
-
-/**
- * An instance of the Spark SQL execution engine that integrates with data stored in Hive.
- * Configuration for Hive is read from hive-site.xml on the classpath.
- *
- * @since 1.0.0
- */
-class HiveContext private[hive](
- @transient private val sparkSession: SparkSession,
- isRootContext: Boolean)
- extends SQLContext(sparkSession, isRootContext) with Logging {
-
- self =>
-
- def this(sc: SparkContext) = {
- this(new SparkSession(HiveContext.withHiveExternalCatalog(sc)), true)
- }
-
- def this(sc: JavaSparkContext) = this(sc.sc)
-
- /**
- * Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF,
- * temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader
- * and Hive client (both of execution and metadata) with existing HiveContext.
- */
- override def newSession(): HiveContext = {
- new HiveContext(sparkSession.newSession(), isRootContext = false)
- }
-
- protected[sql] override def sessionState: HiveSessionState = {
- sparkSession.sessionState.asInstanceOf[HiveSessionState]
- }
-
- protected[sql] override def sharedState: HiveSharedState = {
- sparkSession.sharedState.asInstanceOf[HiveSharedState]
- }
-
-}
-
-
-private[hive] object HiveContext extends Logging {
-
- def withHiveExternalCatalog(sc: SparkContext): SparkContext = {
- sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive")
- sc
- }
-
- /** The version of hive used internally by Spark SQL. */
- val hiveExecutionVersion: String = "1.2.1"
-
- val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version")
- .doc("Version of the Hive metastore. Available options are " +
- s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
- .stringConf
- .createWithDefault(hiveExecutionVersion)
-
- val HIVE_EXECUTION_VERSION = SQLConfigBuilder("spark.sql.hive.version")
- .doc("Version of Hive used internally by Spark SQL.")
- .stringConf
- .createWithDefault(hiveExecutionVersion)
-
- val HIVE_METASTORE_JARS = SQLConfigBuilder("spark.sql.hive.metastore.jars")
- .doc(s"""
- | Location of the jars that should be used to instantiate the HiveMetastoreClient.
- | This property can be one of three options: "
- | 1. "builtin"
- | Use Hive ${hiveExecutionVersion}, which is bundled with the Spark assembly when
- | <code>-Phive</code> is enabled. When this option is chosen,
- | <code>spark.sql.hive.metastore.version</code> must be either
- | <code>${hiveExecutionVersion}</code> or not defined.
- | 2. "maven"
- | Use Hive jars of specified version downloaded from Maven repositories.
- | 3. A classpath in the standard format for both Hive and Hadoop.
- """.stripMargin)
- .stringConf
- .createWithDefault("builtin")
-
- val CONVERT_METASTORE_PARQUET = SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet")
- .doc("When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " +
- "the built in support.")
- .booleanConf
- .createWithDefault(true)
-
- val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING =
- SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet.mergeSchema")
- .doc("When true, also tries to merge possibly different but compatible Parquet schemas in " +
- "different Parquet data files. This configuration is only effective " +
- "when \"spark.sql.hive.convertMetastoreParquet\" is true.")
- .booleanConf
- .createWithDefault(false)
-
- val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
- .doc("When true, a table created by a Hive CTAS statement (no USING clause) will be " +
- "converted to a data source table, using the data source set by spark.sql.sources.default.")
- .booleanConf
- .createWithDefault(false)
-
- val CONVERT_METASTORE_ORC = SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc")
- .doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
- "the built in support.")
- .booleanConf
- .createWithDefault(true)
-
- val HIVE_METASTORE_SHARED_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.sharedPrefixes")
- .doc("A comma separated list of class prefixes that should be loaded using the classloader " +
- "that is shared between Spark SQL and a specific version of Hive. An example of classes " +
- "that should be shared is JDBC drivers that are needed to talk to the metastore. Other " +
- "classes that need to be shared are those that interact with classes that are already " +
- "shared. For example, custom appenders that are used by log4j.")
- .stringConf
- .toSequence
- .createWithDefault(jdbcPrefixes)
-
- private def jdbcPrefixes = Seq(
- "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc")
-
- val HIVE_METASTORE_BARRIER_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.barrierPrefixes")
- .doc("A comma separated list of class prefixes that should explicitly be reloaded for each " +
- "version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are " +
- "declared in a prefix that typically would be shared (i.e. <code>org.apache.spark.*</code>).")
- .stringConf
- .toSequence
- .createWithDefault(Nil)
-
- val HIVE_THRIFT_SERVER_ASYNC = SQLConfigBuilder("spark.sql.hive.thriftServer.async")
- .doc("When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
- .booleanConf
- .createWithDefault(true)
-
- /**
- * The version of the hive client that will be used to communicate with the metastore. Note that
- * this does not necessarily need to be the same version of Hive that is used internally by
- * Spark SQL for execution.
- */
- private def hiveMetastoreVersion(conf: SQLConf): String = {
- conf.getConf(HIVE_METASTORE_VERSION)
- }
-
- /**
- * The location of the jars that should be used to instantiate the HiveMetastoreClient. This
- * property can be one of three options:
- * - a classpath in the standard format for both hive and hadoop.
- * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
- * option is only valid when using the execution version of Hive.
- * - maven - download the correct version of hive on demand from maven.
- */
- private def hiveMetastoreJars(conf: SQLConf): String = {
- conf.getConf(HIVE_METASTORE_JARS)
- }
-
- /**
- * A comma separated list of class prefixes that should be loaded using the classloader that
- * is shared between Spark SQL and a specific version of Hive. An example of classes that should
- * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
- * to be shared are those that interact with classes that are already shared. For example,
- * custom appenders that are used by log4j.
- */
- private def hiveMetastoreSharedPrefixes(conf: SQLConf): Seq[String] = {
- conf.getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "")
- }
-
- /**
- * A comma separated list of class prefixes that should explicitly be reloaded for each version
- * of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
- * prefix that typically would be shared (i.e. org.apache.spark.*)
- */
- private def hiveMetastoreBarrierPrefixes(conf: SQLConf): Seq[String] = {
- conf.getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "")
- }
-
- /**
- * Configurations needed to create a [[HiveClient]].
- */
- private[hive] def hiveClientConfigurations(hiveconf: HiveConf): Map[String, String] = {
- // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
- // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
- // compatibility when users are trying to connecting to a Hive metastore of lower version,
- // because these options are expected to be integral values in lower versions of Hive.
- //
- // Here we enumerate all time `ConfVar`s and convert their values to numeric strings according
- // to their output time units.
- Seq(
- ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS,
- ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS,
- ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS,
- ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS,
- ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS,
- ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS,
- ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS,
- ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS,
- ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS,
- ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS,
- ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS,
- ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
- ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS,
- ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
- ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS,
- ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS,
- ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS,
- ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS,
- ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS
- ).map { case (confVar, unit) =>
- confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString
- }.toMap
- }
-
- /**
- * Create a [[HiveClient]] used for execution.
- *
- * Currently this must always be Hive 13 as this is the version of Hive that is packaged
- * with Spark SQL. This copy of the client is used for execution related tasks like
- * registering temporary functions or ensuring that the ThreadLocal SessionState is
- * correctly populated. This copy of Hive is *not* used for storing persistent metadata,
- * and only point to a dummy metastore in a temporary directory.
- */
- protected[hive] def newClientForExecution(
- conf: SparkConf,
- hadoopConf: Configuration): HiveClientImpl = {
- logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
- val loader = new IsolatedClientLoader(
- version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
- sparkConf = conf,
- execJars = Seq(),
- hadoopConf = hadoopConf,
- config = newTemporaryConfiguration(useInMemoryDerby = true),
- isolationOn = false,
- baseClassLoader = Utils.getContextOrSparkClassLoader)
- loader.createClient().asInstanceOf[HiveClientImpl]
- }
-
- /**
- * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
- *
- * The version of the Hive client that is used here must match the metastore that is configured
- * in the hive-site.xml file.
- */
- protected[hive] def newClientForMetadata(
- conf: SparkConf,
- hadoopConf: Configuration): HiveClient = {
- val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
- val configurations = hiveClientConfigurations(hiveConf)
- newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
- }
-
- protected[hive] def newClientForMetadata(
- conf: SparkConf,
- hiveConf: HiveConf,
- hadoopConf: Configuration,
- configurations: Map[String, String]): HiveClient = {
- val sqlConf = new SQLConf
- sqlConf.setConf(SQLContext.getSQLProperties(conf))
- val hiveMetastoreVersion = HiveContext.hiveMetastoreVersion(sqlConf)
- val hiveMetastoreJars = HiveContext.hiveMetastoreJars(sqlConf)
- val hiveMetastoreSharedPrefixes = HiveContext.hiveMetastoreSharedPrefixes(sqlConf)
- val hiveMetastoreBarrierPrefixes = HiveContext.hiveMetastoreBarrierPrefixes(sqlConf)
- val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
-
- val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir")
- logInfo("default warehouse location is " + defaultWarehouseLocation)
-
- // `configure` goes second to override other settings.
- val allConfig = hiveConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configurations
-
- val isolatedLoader = if (hiveMetastoreJars == "builtin") {
- if (hiveExecutionVersion != hiveMetastoreVersion) {
- throw new IllegalArgumentException(
- "Builtin jars can only be used when hive execution version == hive metastore version. " +
- s"Execution: $hiveExecutionVersion != Metastore: $hiveMetastoreVersion. " +
- "Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
- s"or change ${HIVE_METASTORE_VERSION.key} to $hiveExecutionVersion.")
- }
-
- // We recursively find all jars in the class loader chain,
- // starting from the given classLoader.
- def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
- case null => Array.empty[URL]
- case urlClassLoader: URLClassLoader =>
- urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
- case other => allJars(other.getParent)
- }
-
- val classLoader = Utils.getContextOrSparkClassLoader
- val jars = allJars(classLoader)
- if (jars.length == 0) {
- throw new IllegalArgumentException(
- "Unable to locate hive jars to connect to metastore. " +
- "Please set spark.sql.hive.metastore.jars.")
- }
-
- logInfo(
- s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
- new IsolatedClientLoader(
- version = metaVersion,
- sparkConf = conf,
- hadoopConf = hadoopConf,
- execJars = jars.toSeq,
- config = allConfig,
- isolationOn = true,
- barrierPrefixes = hiveMetastoreBarrierPrefixes,
- sharedPrefixes = hiveMetastoreSharedPrefixes)
- } else if (hiveMetastoreJars == "maven") {
- // TODO: Support for loading the jars from an already downloaded location.
- logInfo(
- s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
- IsolatedClientLoader.forVersion(
- hiveMetastoreVersion = hiveMetastoreVersion,
- hadoopVersion = VersionInfo.getVersion,
- sparkConf = conf,
- hadoopConf = hadoopConf,
- config = allConfig,
- barrierPrefixes = hiveMetastoreBarrierPrefixes,
- sharedPrefixes = hiveMetastoreSharedPrefixes)
- } else {
- // Convert to files and expand any directories.
- val jars =
- hiveMetastoreJars
- .split(File.pathSeparator)
- .flatMap {
- case path if new File(path).getName == "*" =>
- val files = new File(path).getParentFile.listFiles()
- if (files == null) {
- logWarning(s"Hive jar path '$path' does not exist.")
- Nil
- } else {
- files.filter(_.getName.toLowerCase.endsWith(".jar"))
- }
- case path =>
- new File(path) :: Nil
- }
- .map(_.toURI.toURL)
-
- logInfo(
- s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
- s"using ${jars.mkString(":")}")
- new IsolatedClientLoader(
- version = metaVersion,
- sparkConf = conf,
- hadoopConf = hadoopConf,
- execJars = jars.toSeq,
- config = allConfig,
- isolationOn = true,
- barrierPrefixes = hiveMetastoreBarrierPrefixes,
- sharedPrefixes = hiveMetastoreSharedPrefixes)
- }
- isolatedLoader.createClient()
- }
-
- /** Constructs a configuration for hive, where the metastore is located in a temp directory. */
- def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String] = {
- val withInMemoryMode = if (useInMemoryDerby) "memory:" else ""
-
- val tempDir = Utils.createTempDir()
- val localMetastore = new File(tempDir, "metastore")
- val propMap: HashMap[String, String] = HashMap()
- // We have to mask all properties in hive-site.xml that relates to metastore data source
- // as we used a local metastore here.
- HiveConf.ConfVars.values().foreach { confvar =>
- if (confvar.varname.contains("datanucleus") || confvar.varname.contains("jdo")
- || confvar.varname.contains("hive.metastore.rawstore.impl")) {
- propMap.put(confvar.varname, confvar.getDefaultExpr())
- }
- }
- propMap.put(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, localMetastore.toURI.toString)
- propMap.put(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
- s"jdbc:derby:${withInMemoryMode};databaseName=${localMetastore.getAbsolutePath};create=true")
- propMap.put("datanucleus.rdbms.datastoreAdapterClassName",
- "org.datanucleus.store.rdbms.adapter.DerbyAdapter")
-
- // SPARK-11783: When "hive.metastore.uris" is set, the metastore connection mode will be
- // remote (https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin
- // mentions that "If hive.metastore.uris is empty local mode is assumed, remote otherwise").
- // Remote means that the metastore server is running in its own process.
- // When the mode is remote, configurations like "javax.jdo.option.ConnectionURL" will not be
- // used (because they are used by remote metastore server that talks to the database).
- // Because execution Hive should always connects to a embedded derby metastore.
- // We have to remove the value of hive.metastore.uris. So, the execution Hive client connects
- // to the actual embedded derby metastore instead of the remote metastore.
- // You can search HiveConf.ConfVars.METASTOREURIS in the code of HiveConf (in Hive's repo).
- // Then, you will find that the local metastore mode is only set to true when
- // hive.metastore.uris is not set.
- propMap.put(ConfVars.METASTOREURIS.varname, "")
-
- propMap.toMap
- }
-
- protected val primitiveTypes =
- Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
- ShortType, DateType, TimestampType, BinaryType)
-
- protected[sql] def toHiveString(a: (Any, DataType)): String = a match {
- case (struct: Row, StructType(fields)) =>
- struct.toSeq.zip(fields).map {
- case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
- }.mkString("{", ",", "}")
- case (seq: Seq[_], ArrayType(typ, _)) =>
- seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
- case (map: Map[_, _], MapType(kType, vType, _)) =>
- map.map {
- case (key, value) =>
- toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
- }.toSeq.sorted.mkString("{", ",", "}")
- case (null, _) => "NULL"
- case (d: Int, DateType) => new DateWritable(d).toString
- case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
- case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
- case (decimal: java.math.BigDecimal, DecimalType()) =>
- // Hive strips trailing zeros so use its toString
- HiveDecimal.create(decimal).toString
- case (other, tpe) if primitiveTypes contains tpe => other.toString
- }
-
- /** Hive outputs fields of structs slightly differently than top level attributes. */
- protected def toHiveStructString(a: (Any, DataType)): String = a match {
- case (struct: Row, StructType(fields)) =>
- struct.toSeq.zip(fields).map {
- case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
- }.mkString("{", ",", "}")
- case (seq: Seq[_], ArrayType(typ, _)) =>
- seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
- case (map: Map[_, _], MapType(kType, vType, _)) =>
- map.map {
- case (key, value) =>
- toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
- }.toSeq.sorted.mkString("{", ",", "}")
- case (null, _) => "null"
- case (s: String, StringType) => "\"" + s + "\""
- case (decimal, DecimalType()) => decimal.toString
- case (other, tpe) if primitiveTypes contains tpe => other.toString
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
index 0ee34f0..ed1340d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
@@ -53,7 +53,7 @@ protected[hive] class HiveQueryExecution(ctx: SQLContext, logicalPlan: LogicalPl
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
// Reformat to match hive tab delimited output.
- result.map(_.zip(types).map(HiveContext.toHiveString)).map(_.mkString("\t")).toSeq
+ result.map(_.zip(types).map(HiveUtils.toHiveString)).map(_.mkString("\t")).toSeq
}
override def simpleString: String =
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 4db0d78..d8cc057 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -190,7 +190,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
* SerDe.
*/
def convertMetastoreParquet: Boolean = {
- conf.getConf(HiveContext.CONVERT_METASTORE_PARQUET)
+ conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
}
/**
@@ -200,7 +200,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
* This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true.
*/
def convertMetastoreParquetWithSchemaMerging: Boolean = {
- conf.getConf(HiveContext.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
+ conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
}
/**
@@ -209,7 +209,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
* SerDe.
*/
def convertMetastoreOrc: Boolean = {
- conf.getConf(HiveContext.CONVERT_METASTORE_ORC)
+ conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}
/**
@@ -225,14 +225,14 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
* and no SerDe is specified (no ROW FORMAT SERDE clause).
*/
def convertCTAS: Boolean = {
- conf.getConf(HiveContext.CONVERT_CTAS)
+ conf.getConf(HiveUtils.CONVERT_CTAS)
}
/**
* When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool."
*/
def hiveThriftServerAsync: Boolean = {
- conf.getConf(HiveContext.HIVE_THRIFT_SERVER_ASYNC)
+ conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
}
def hiveThriftServerSingleSession: Boolean = {
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
index 11097c3..1d8ce30 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
@@ -34,7 +34,7 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext)
* A Hive client used for execution.
*/
val executionHive: HiveClientImpl = {
- HiveContext.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
+ HiveUtils.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
}
/**
@@ -42,7 +42,7 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext)
*/
// This needs to be a lazy val at here because TestHiveSharedState is overriding it.
lazy val metadataHive: HiveClient = {
- HiveContext.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
+ HiveUtils.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
new file mode 100644
index 0000000..44d3cc2
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.net.{URL, URLClassLoader}
+import java.nio.charset.StandardCharsets
+import java.sql.Timestamp
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+import scala.language.implicitConversions
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
+import org.apache.hadoop.util.VersionInfo
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.client._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+/**
+ * An instance of the Spark SQL execution engine that integrates with data stored in Hive.
+ * Configuration for Hive is read from hive-site.xml on the classpath.
+ *
+ * @since 1.0.0
+ */
+class HiveContext private[hive](
+ @transient private val sparkSession: SparkSession,
+ isRootContext: Boolean)
+ extends SQLContext(sparkSession, isRootContext) with Logging {
+
+ self =>
+
+ def this(sc: SparkContext) = {
+ this(new SparkSession(HiveUtils.withHiveExternalCatalog(sc)), true)
+ }
+
+ def this(sc: JavaSparkContext) = this(sc.sc)
+
+ /**
+ * Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF,
+ * temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader
+ * and Hive client (both of execution and metadata) with existing HiveContext.
+ */
+ override def newSession(): HiveContext = {
+ new HiveContext(sparkSession.newSession(), isRootContext = false)
+ }
+
+ protected[sql] override def sessionState: HiveSessionState = {
+ sparkSession.sessionState.asInstanceOf[HiveSessionState]
+ }
+
+ protected[sql] override def sharedState: HiveSharedState = {
+ sparkSession.sharedState.asInstanceOf[HiveSharedState]
+ }
+
+}
+
+
+private[spark] object HiveUtils extends Logging {
+
+ def withHiveExternalCatalog(sc: SparkContext): SparkContext = {
+ sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive")
+ sc
+ }
+
+ /** The version of hive used internally by Spark SQL. */
+ val hiveExecutionVersion: String = "1.2.1"
+
+ val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version")
+ .doc("Version of the Hive metastore. Available options are " +
+ s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
+ .stringConf
+ .createWithDefault(hiveExecutionVersion)
+
+ val HIVE_EXECUTION_VERSION = SQLConfigBuilder("spark.sql.hive.version")
+ .doc("Version of Hive used internally by Spark SQL.")
+ .stringConf
+ .createWithDefault(hiveExecutionVersion)
+
+ val HIVE_METASTORE_JARS = SQLConfigBuilder("spark.sql.hive.metastore.jars")
+ .doc(s"""
+ | Location of the jars that should be used to instantiate the HiveMetastoreClient.
+ | This property can be one of three options: "
+ | 1. "builtin"
+ | Use Hive ${hiveExecutionVersion}, which is bundled with the Spark assembly when
+ | <code>-Phive</code> is enabled. When this option is chosen,
+ | <code>spark.sql.hive.metastore.version</code> must be either
+ | <code>${hiveExecutionVersion}</code> or not defined.
+ | 2. "maven"
+ | Use Hive jars of specified version downloaded from Maven repositories.
+ | 3. A classpath in the standard format for both Hive and Hadoop.
+ """.stripMargin)
+ .stringConf
+ .createWithDefault("builtin")
+
+ val CONVERT_METASTORE_PARQUET = SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet")
+ .doc("When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " +
+ "the built in support.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING =
+ SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet.mergeSchema")
+ .doc("When true, also tries to merge possibly different but compatible Parquet schemas in " +
+ "different Parquet data files. This configuration is only effective " +
+ "when \"spark.sql.hive.convertMetastoreParquet\" is true.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
+ .doc("When true, a table created by a Hive CTAS statement (no USING clause) will be " +
+ "converted to a data source table, using the data source set by spark.sql.sources.default.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val CONVERT_METASTORE_ORC = SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc")
+ .doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
+ "the built in support.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val HIVE_METASTORE_SHARED_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.sharedPrefixes")
+ .doc("A comma separated list of class prefixes that should be loaded using the classloader " +
+ "that is shared between Spark SQL and a specific version of Hive. An example of classes " +
+ "that should be shared is JDBC drivers that are needed to talk to the metastore. Other " +
+ "classes that need to be shared are those that interact with classes that are already " +
+ "shared. For example, custom appenders that are used by log4j.")
+ .stringConf
+ .toSequence
+ .createWithDefault(jdbcPrefixes)
+
+ private def jdbcPrefixes = Seq(
+ "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc")
+
+ val HIVE_METASTORE_BARRIER_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.barrierPrefixes")
+ .doc("A comma separated list of class prefixes that should explicitly be reloaded for each " +
+ "version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are " +
+ "declared in a prefix that typically would be shared (i.e. <code>org.apache.spark.*</code>).")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ val HIVE_THRIFT_SERVER_ASYNC = SQLConfigBuilder("spark.sql.hive.thriftServer.async")
+ .doc("When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
+ .booleanConf
+ .createWithDefault(true)
+
+ /**
+ * The version of the hive client that will be used to communicate with the metastore. Note that
+ * this does not necessarily need to be the same version of Hive that is used internally by
+ * Spark SQL for execution.
+ */
+ private def hiveMetastoreVersion(conf: SQLConf): String = {
+ conf.getConf(HIVE_METASTORE_VERSION)
+ }
+
+ /**
+ * The location of the jars that should be used to instantiate the HiveMetastoreClient. This
+ * property can be one of three options:
+ * - a classpath in the standard format for both hive and hadoop.
+ * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
+ * option is only valid when using the execution version of Hive.
+ * - maven - download the correct version of hive on demand from maven.
+ */
+ private def hiveMetastoreJars(conf: SQLConf): String = {
+ conf.getConf(HIVE_METASTORE_JARS)
+ }
+
+ /**
+ * A comma separated list of class prefixes that should be loaded using the classloader that
+ * is shared between Spark SQL and a specific version of Hive. An example of classes that should
+ * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
+ * to be shared are those that interact with classes that are already shared. For example,
+ * custom appenders that are used by log4j.
+ */
+ private def hiveMetastoreSharedPrefixes(conf: SQLConf): Seq[String] = {
+ conf.getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "")
+ }
+
+ /**
+ * A comma separated list of class prefixes that should explicitly be reloaded for each version
+ * of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
+ * prefix that typically would be shared (i.e. org.apache.spark.*)
+ */
+ private def hiveMetastoreBarrierPrefixes(conf: SQLConf): Seq[String] = {
+ conf.getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "")
+ }
+
+ /**
+ * Configurations needed to create a [[HiveClient]].
+ */
+ private[hive] def hiveClientConfigurations(hiveconf: HiveConf): Map[String, String] = {
+ // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
+ // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
+ // compatibility when users are trying to connecting to a Hive metastore of lower version,
+ // because these options are expected to be integral values in lower versions of Hive.
+ //
+ // Here we enumerate all time `ConfVar`s and convert their values to numeric strings according
+ // to their output time units.
+ Seq(
+ ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS,
+ ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS,
+ ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS,
+ ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS,
+ ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS,
+ ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS,
+ ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS,
+ ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS,
+ ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS,
+ ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS,
+ ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS,
+ ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
+ ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS,
+ ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS,
+ ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS,
+ ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS,
+ ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS
+ ).map { case (confVar, unit) =>
+ confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString
+ }.toMap
+ }
+
+ /**
+ * Create a [[HiveClient]] used for execution.
+ *
+ * Currently this must always be Hive 13 as this is the version of Hive that is packaged
+ * with Spark SQL. This copy of the client is used for execution related tasks like
+ * registering temporary functions or ensuring that the ThreadLocal SessionState is
+ * correctly populated. This copy of Hive is *not* used for storing persistent metadata,
+ * and only point to a dummy metastore in a temporary directory.
+ */
+ protected[hive] def newClientForExecution(
+ conf: SparkConf,
+ hadoopConf: Configuration): HiveClientImpl = {
+ logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
+ val loader = new IsolatedClientLoader(
+ version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
+ sparkConf = conf,
+ execJars = Seq(),
+ hadoopConf = hadoopConf,
+ config = newTemporaryConfiguration(useInMemoryDerby = true),
+ isolationOn = false,
+ baseClassLoader = Utils.getContextOrSparkClassLoader)
+ loader.createClient().asInstanceOf[HiveClientImpl]
+ }
+
+ /**
+ * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
+ *
+ * The version of the Hive client that is used here must match the metastore that is configured
+ * in the hive-site.xml file.
+ */
+ protected[hive] def newClientForMetadata(
+ conf: SparkConf,
+ hadoopConf: Configuration): HiveClient = {
+ val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
+ val configurations = hiveClientConfigurations(hiveConf)
+ newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
+ }
+
+ protected[hive] def newClientForMetadata(
+ conf: SparkConf,
+ hiveConf: HiveConf,
+ hadoopConf: Configuration,
+ configurations: Map[String, String]): HiveClient = {
+ val sqlConf = new SQLConf
+ sqlConf.setConf(SQLContext.getSQLProperties(conf))
+ val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf)
+ val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf)
+ val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf)
+ val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf)
+ val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
+
+ val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir")
+ logInfo("default warehouse location is " + defaultWarehouseLocation)
+
+ // `configure` goes second to override other settings.
+ val allConfig = hiveConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configurations
+
+ val isolatedLoader = if (hiveMetastoreJars == "builtin") {
+ if (hiveExecutionVersion != hiveMetastoreVersion) {
+ throw new IllegalArgumentException(
+ "Builtin jars can only be used when hive execution version == hive metastore version. " +
+ s"Execution: $hiveExecutionVersion != Metastore: $hiveMetastoreVersion. " +
+ "Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
+ s"or change ${HIVE_METASTORE_VERSION.key} to $hiveExecutionVersion.")
+ }
+
+ // We recursively find all jars in the class loader chain,
+ // starting from the given classLoader.
+ def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
+ case null => Array.empty[URL]
+ case urlClassLoader: URLClassLoader =>
+ urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
+ case other => allJars(other.getParent)
+ }
+
+ val classLoader = Utils.getContextOrSparkClassLoader
+ val jars = allJars(classLoader)
+ if (jars.length == 0) {
+ throw new IllegalArgumentException(
+ "Unable to locate hive jars to connect to metastore. " +
+ "Please set spark.sql.hive.metastore.jars.")
+ }
+
+ logInfo(
+ s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
+ new IsolatedClientLoader(
+ version = metaVersion,
+ sparkConf = conf,
+ hadoopConf = hadoopConf,
+ execJars = jars.toSeq,
+ config = allConfig,
+ isolationOn = true,
+ barrierPrefixes = hiveMetastoreBarrierPrefixes,
+ sharedPrefixes = hiveMetastoreSharedPrefixes)
+ } else if (hiveMetastoreJars == "maven") {
+ // TODO: Support for loading the jars from an already downloaded location.
+ logInfo(
+ s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
+ IsolatedClientLoader.forVersion(
+ hiveMetastoreVersion = hiveMetastoreVersion,
+ hadoopVersion = VersionInfo.getVersion,
+ sparkConf = conf,
+ hadoopConf = hadoopConf,
+ config = allConfig,
+ barrierPrefixes = hiveMetastoreBarrierPrefixes,
+ sharedPrefixes = hiveMetastoreSharedPrefixes)
+ } else {
+ // Convert to files and expand any directories.
+ val jars =
+ hiveMetastoreJars
+ .split(File.pathSeparator)
+ .flatMap {
+ case path if new File(path).getName == "*" =>
+ val files = new File(path).getParentFile.listFiles()
+ if (files == null) {
+ logWarning(s"Hive jar path '$path' does not exist.")
+ Nil
+ } else {
+ files.filter(_.getName.toLowerCase.endsWith(".jar"))
+ }
+ case path =>
+ new File(path) :: Nil
+ }
+ .map(_.toURI.toURL)
+
+ logInfo(
+ s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
+ s"using ${jars.mkString(":")}")
+ new IsolatedClientLoader(
+ version = metaVersion,
+ sparkConf = conf,
+ hadoopConf = hadoopConf,
+ execJars = jars.toSeq,
+ config = allConfig,
+ isolationOn = true,
+ barrierPrefixes = hiveMetastoreBarrierPrefixes,
+ sharedPrefixes = hiveMetastoreSharedPrefixes)
+ }
+ isolatedLoader.createClient()
+ }
+
+ /** Constructs a configuration for hive, where the metastore is located in a temp directory. */
+ def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String] = {
+ val withInMemoryMode = if (useInMemoryDerby) "memory:" else ""
+
+ val tempDir = Utils.createTempDir()
+ val localMetastore = new File(tempDir, "metastore")
+ val propMap: HashMap[String, String] = HashMap()
+ // We have to mask all properties in hive-site.xml that relates to metastore data source
+ // as we used a local metastore here.
+ HiveConf.ConfVars.values().foreach { confvar =>
+ if (confvar.varname.contains("datanucleus") || confvar.varname.contains("jdo")
+ || confvar.varname.contains("hive.metastore.rawstore.impl")) {
+ propMap.put(confvar.varname, confvar.getDefaultExpr())
+ }
+ }
+ propMap.put(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, localMetastore.toURI.toString)
+ propMap.put(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
+ s"jdbc:derby:${withInMemoryMode};databaseName=${localMetastore.getAbsolutePath};create=true")
+ propMap.put("datanucleus.rdbms.datastoreAdapterClassName",
+ "org.datanucleus.store.rdbms.adapter.DerbyAdapter")
+
+ // SPARK-11783: When "hive.metastore.uris" is set, the metastore connection mode will be
+ // remote (https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin
+ // mentions that "If hive.metastore.uris is empty local mode is assumed, remote otherwise").
+ // Remote means that the metastore server is running in its own process.
+ // When the mode is remote, configurations like "javax.jdo.option.ConnectionURL" will not be
+ // used (because they are used by remote metastore server that talks to the database).
+ // Because execution Hive should always connects to a embedded derby metastore.
+ // We have to remove the value of hive.metastore.uris. So, the execution Hive client connects
+ // to the actual embedded derby metastore instead of the remote metastore.
+ // You can search HiveConf.ConfVars.METASTOREURIS in the code of HiveConf (in Hive's repo).
+ // Then, you will find that the local metastore mode is only set to true when
+ // hive.metastore.uris is not set.
+ propMap.put(ConfVars.METASTOREURIS.varname, "")
+
+ propMap.toMap
+ }
+
+ protected val primitiveTypes =
+ Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
+ ShortType, DateType, TimestampType, BinaryType)
+
+ protected[sql] def toHiveString(a: (Any, DataType)): String = a match {
+ case (struct: Row, StructType(fields)) =>
+ struct.toSeq.zip(fields).map {
+ case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
+ }.mkString("{", ",", "}")
+ case (seq: Seq[_], ArrayType(typ, _)) =>
+ seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
+ case (map: Map[_, _], MapType(kType, vType, _)) =>
+ map.map {
+ case (key, value) =>
+ toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
+ }.toSeq.sorted.mkString("{", ",", "}")
+ case (null, _) => "NULL"
+ case (d: Int, DateType) => new DateWritable(d).toString
+ case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
+ case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
+ case (decimal: java.math.BigDecimal, DecimalType()) =>
+ // Hive strips trailing zeros so use its toString
+ HiveDecimal.create(decimal).toString
+ case (other, tpe) if primitiveTypes contains tpe => other.toString
+ }
+
+ /** Hive outputs fields of structs slightly differently than top level attributes. */
+ protected def toHiveStructString(a: (Any, DataType)): String = a match {
+ case (struct: Row, StructType(fields)) =>
+ struct.toSeq.zip(fields).map {
+ case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
+ }.mkString("{", ",", "}")
+ case (seq: Seq[_], ArrayType(typ, _)) =>
+ seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
+ case (map: Map[_, _], MapType(kType, vType, _)) =>
+ map.map {
+ case (key, value) =>
+ toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
+ }.toSeq.sorted.mkString("{", ",", "}")
+ case (null, _) => "null"
+ case (s: String, StringType) => "\"" + s + "\""
+ case (decimal, DecimalType()) => decimal.toString
+ case (other, tpe) if primitiveTypes contains tpe => other.toString
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 6a20d7c..e95069e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -23,8 +23,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities
-import org.apache.hadoop.hive.ql.metadata.{HiveUtils, Partition => HivePartition,
- Table => HiveTable}
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters,
@@ -300,7 +299,8 @@ private[hive] object HiveTableUtil {
def configureJobPropertiesForStorageHandler(
tableDesc: TableDesc, jobConf: JobConf, input: Boolean) {
val property = tableDesc.getProperties.getProperty(META_TABLE_STORAGE)
- val storageHandler = HiveUtils.getStorageHandler(jobConf, property)
+ val storageHandler =
+ org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property)
if (storageHandler != null) {
val jobProperties = new util.LinkedHashMap[String, String]
if (input) {
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 7e0d1b4..0380d23 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -32,7 +32,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.util.{MutableURLClassLoader, Utils}
/** Factory for `IsolatedClientLoader` with specific versions of hive. */
@@ -263,7 +263,7 @@ private[hive] class IsolatedClientLoader(
throw new ClassNotFoundException(
s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" +
"Please make sure that jars for your version of hive and hadoop are included in the " +
- s"paths passed to ${HiveContext.HIVE_METASTORE_JARS}.", e)
+ s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS}.", e)
} else {
throw e
}
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 741e3bd..7f8f629 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -74,7 +74,7 @@ class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootC
extends HiveContext(sparkSession, isRootContext) {
def this(sc: SparkContext) {
- this(new TestHiveSparkSession(HiveContext.withHiveExternalCatalog(sc)), true)
+ this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc)), true)
}
override def newSession(): TestHiveContext = {
@@ -117,7 +117,7 @@ private[hive] class TestHiveSparkSession(
sc,
Utils.createTempDir(namePrefix = "warehouse"),
TestHiveContext.makeScratchDir(),
- HiveContext.newTemporaryConfiguration(useInMemoryDerby = false),
+ HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false),
None)
}
@@ -576,7 +576,7 @@ private[hive] object TestHiveContext {
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String]): HiveClient = {
val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
- HiveContext.newClientForMetadata(
+ HiveUtils.newClientForMetadata(
conf,
hiveConf,
hadoopConf,
@@ -591,7 +591,7 @@ private[hive] object TestHiveContext {
warehousePath: File,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String]): Map[String, String] = {
- HiveContext.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ Map(
+ HiveUtils.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ Map(
ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString,
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 84285b7..cb60a2c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -31,7 +31,7 @@ class HiveExternalCatalogSuite extends CatalogTestCases {
private val client: HiveClient = {
// We create a metastore at a temp location to avoid any potential
// conflict of having multiple connections to a single derby instance.
- HiveContext.newClientForExecution(new SparkConf, new Configuration)
+ HiveUtils.newClientForExecution(new SparkConf, new Configuration)
}
protected override val utils: CatalogTestUtils = new CatalogTestUtils {
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index bbe135b..dc87daa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -550,7 +550,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
test("scan a parquet table created through a CTAS statement") {
- withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "true") {
+ withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") {
withTempTable("jt") {
(1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt")
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index a9823ae..d789145 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -53,7 +53,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil
// Don't convert Hive metastore Parquet tables to let Hive write those Parquet files.
- withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
+ withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
withTempTable("data") {
val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s" col_$index $typ" }
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 8b07192..e0288ff 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.Utils
@@ -62,7 +62,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
test("success sanity check") {
val badClient = IsolatedClientLoader.forVersion(
- hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+ hiveMetastoreVersion = HiveUtils.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = new Configuration(),
@@ -76,7 +76,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
val hadoopConf = new Configuration();
hadoopConf.set("test", "success")
val client = IsolatedClientLoader.forVersion(
- hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+ hiveMetastoreVersion = HiveUtils.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = hadoopConf,
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 345ee8e..2e14aaa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
+import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
@@ -351,7 +351,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
val originalConf = sessionState.convertCTAS
- setConf(HiveContext.CONVERT_CTAS, true)
+ setConf(HiveUtils.CONVERT_CTAS, true)
try {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
@@ -395,7 +395,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
checkRelation("ctas1", false)
sql("DROP TABLE ctas1")
} finally {
- setConf(HiveContext.CONVERT_CTAS, originalConf)
+ setConf(HiveUtils.CONVERT_CTAS, originalConf)
sql("DROP TABLE IF EXISTS ctas1")
}
}
@@ -470,7 +470,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
| FROM src
| ORDER BY key, value""".stripMargin).collect()
- withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
+ withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
checkExistence(sql("DESC EXTENDED ctas5"), true,
"name:key", "type:string", "name:value", "ctas5",
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
@@ -481,7 +481,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
// use the Hive SerDe for parquet tables
- withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
+ withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
checkAnswer(
sql("SELECT key, value FROM ctas5 ORDER BY key, value"),
sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq)
@@ -732,7 +732,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
read.json(rdd).registerTempTable("data")
val originalConf = sessionState.convertCTAS
- setConf(HiveContext.CONVERT_CTAS, false)
+ setConf(HiveUtils.CONVERT_CTAS, false)
try {
sql("CREATE TABLE explodeTest (key bigInt)")
@@ -751,7 +751,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
sql("DROP TABLE explodeTest")
dropTempTable("data")
} finally {
- setConf(HiveContext.CONVERT_CTAS, originalConf)
+ setConf(HiveUtils.CONVERT_CTAS, originalConf)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 5ef8194..4fb78ac 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.internal.SQLConf
@@ -406,7 +406,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
test("SPARK-14070 Use ORC data source for SQL queries on ORC tables") {
withTempPath { dir =>
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true",
- HiveContext.CONVERT_METASTORE_ORC.key -> "true") {
+ HiveUtils.CONVERT_METASTORE_ORC.key -> "true") {
val path = dir.getCanonicalPath
withTable("dummy_orc") {
http://git-wip-us.apache.org/repos/asf/spark/blob/df1953f0/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 6fa4c33..2984ee9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -174,7 +174,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
(1 to 10).map(i => (i, s"str$i")).toDF("a", "b").registerTempTable("jt")
(1 to 10).map(i => Tuple1(Seq(new Integer(i), null))).toDF("a").registerTempTable("jt_array")
- setConf(HiveContext.CONVERT_METASTORE_PARQUET, true)
+ setConf(HiveUtils.CONVERT_METASTORE_PARQUET, true)
}
override def afterAll(): Unit = {
@@ -186,7 +186,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
"jt",
"jt_array",
"test_parquet")
- setConf(HiveContext.CONVERT_METASTORE_PARQUET, false)
+ setConf(HiveUtils.CONVERT_METASTORE_PARQUET, false)
}
test(s"conversion is working") {
@@ -619,7 +619,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
withTable("array_of_struct") {
val conf = Seq(
- HiveContext.CONVERT_METASTORE_PARQUET.key -> "false",
+ HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
SQLConf.PARQUET_BINARY_AS_STRING.key -> "true",
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org