You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/26 05:54:37 UTC
[3/4] spark git commit: [SPARK-14861][SQL] Replace internal usages of
SQLContext with SparkSession
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index bb83676..d3d83b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import java.sql.Timestamp
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
@@ -39,39 +39,41 @@ import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampT
* While this is not a public class, we should avoid changing the function names for the sake of
* changing them, because a lot of developers use the feature for debugging.
*/
-class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
+class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
// TODO: Move the planner an optimizer into here from SessionState.
- protected def planner = sqlContext.sessionState.planner
-
- def assertAnalyzed(): Unit = try sqlContext.sessionState.analyzer.checkAnalysis(analyzed) catch {
- case e: AnalysisException =>
- val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed))
- ae.setStackTrace(e.getStackTrace)
- throw ae
+ protected def planner = sparkSession.sessionState.planner
+
+ def assertAnalyzed(): Unit = {
+ try sparkSession.sessionState.analyzer.checkAnalysis(analyzed) catch {
+ case e: AnalysisException =>
+ val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed))
+ ae.setStackTrace(e.getStackTrace)
+ throw ae
+ }
}
def assertSupported(): Unit = {
- if (sqlContext.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+ if (sparkSession.sessionState.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
UnsupportedOperationChecker.checkForBatch(analyzed)
}
}
lazy val analyzed: LogicalPlan = {
- SQLContext.setActive(sqlContext)
- sqlContext.sessionState.analyzer.execute(logical)
+ SQLContext.setActive(sparkSession.wrapped)
+ sparkSession.sessionState.analyzer.execute(logical)
}
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
assertSupported()
- sqlContext.cacheManager.useCachedData(analyzed)
+ sparkSession.cacheManager.useCachedData(analyzed)
}
- lazy val optimizedPlan: LogicalPlan = sqlContext.sessionState.optimizer.execute(withCachedData)
+ lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)
lazy val sparkPlan: SparkPlan = {
- SQLContext.setActive(sqlContext)
+ SQLContext.setActive(sparkSession.wrapped)
planner.plan(ReturnAnswer(optimizedPlan)).next()
}
@@ -93,10 +95,10 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
python.ExtractPythonUDFs,
- PlanSubqueries(sqlContext),
- EnsureRequirements(sqlContext.conf),
- CollapseCodegenStages(sqlContext.conf),
- ReuseExchange(sqlContext.conf))
+ PlanSubqueries(sparkSession),
+ EnsureRequirements(sparkSession.sessionState.conf),
+ CollapseCodegenStages(sparkSession.sessionState.conf),
+ ReuseExchange(sparkSession.sessionState.conf))
protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
@@ -110,7 +112,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
case ExecutedCommandExec(desc: DescribeTableCommand) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
- desc.run(sqlContext).map {
+ desc.run(sparkSession).map {
case Row(name: String, dataType: String, comment) =>
Seq(name, dataType,
Option(comment.asInstanceOf[String]).getOrElse(""))
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 0a11b16..397d66b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd,
SparkListenerSQLExecutionStart}
import org.apache.spark.util.Utils
@@ -38,21 +38,22 @@ private[sql] object SQLExecution {
* we can connect them with an execution.
*/
def withNewExecutionId[T](
- sqlContext: SQLContext, queryExecution: QueryExecution)(body: => T): T = {
- val sc = sqlContext.sparkContext
+ sparkSession: SparkSession,
+ queryExecution: QueryExecution)(body: => T): T = {
+ val sc = sparkSession.sparkContext
val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
if (oldExecutionId == null) {
val executionId = SQLExecution.nextExecutionId
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
val r = try {
val callSite = Utils.getCallSite()
- sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
+ sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
try {
body
} finally {
- sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
+ sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
}
} finally {
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index e28e456..861ff3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -18,12 +18,10 @@
package org.apache.spark.sql.execution
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
-import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
-import scala.util.control.NonFatal
import org.apache.spark.{broadcast, SparkEnv}
import org.apache.spark.internal.Logging
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
index e6c5351..b6f7808 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
@@ -21,7 +21,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
@@ -35,8 +35,8 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
*/
case class AnalyzeTable(tableName: String) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val sessionState = sqlContext.sessionState
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val sessionState = sparkSession.sessionState
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
@@ -77,7 +77,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
catalogTable.storage.locationUri.map { p =>
val path = new Path(p)
try {
- val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
+ val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf)
calculateTableSize(fs, path)
} catch {
case NonFatal(e) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
index 39e441f..bf66ea4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.types.StringType
@@ -29,7 +29,7 @@ case class HiveNativeCommand(sql: String) extends RunnableCommand {
override def output: Seq[AttributeReference] =
Seq(AttributeReference("result", StringType, nullable = false)())
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.sessionState.runNativeSql(sql).map(Row(_))
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.sessionState.runNativeSql(sql).map(Row(_))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index 4daf9e9..952a0d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import java.util.NoSuchElementException
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -43,10 +43,10 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
schema.toAttributes
}
- private val (_output, runFunc): (Seq[Attribute], SQLContext => Seq[Row]) = kv match {
+ private val (_output, runFunc): (Seq[Attribute], SparkSession => Seq[Row]) = kv match {
// Configures the deprecated "mapred.reduce.tasks" property.
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
- val runFunc = (sqlContext: SQLContext) => {
+ val runFunc = (sparkSession: SparkSession) => {
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
@@ -56,14 +56,14 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
"determining the number of reducers is not supported."
throw new IllegalArgumentException(msg)
} else {
- sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
+ sparkSession.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
}
}
(keyValueOutput, runFunc)
case Some((SQLConf.Deprecated.EXTERNAL_SORT, Some(value))) =>
- val runFunc = (sqlContext: SQLContext) => {
+ val runFunc = (sparkSession: SparkSession) => {
logWarning(
s"Property ${SQLConf.Deprecated.EXTERNAL_SORT} is deprecated and will be ignored. " +
s"External sort will continue to be used.")
@@ -72,7 +72,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
(keyValueOutput, runFunc)
case Some((SQLConf.Deprecated.USE_SQL_AGGREGATE2, Some(value))) =>
- val runFunc = (sqlContext: SQLContext) => {
+ val runFunc = (sparkSession: SparkSession) => {
logWarning(
s"Property ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} is deprecated and " +
s"will be ignored. ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} will " +
@@ -82,7 +82,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
(keyValueOutput, runFunc)
case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
- val runFunc = (sqlContext: SQLContext) => {
+ val runFunc = (sparkSession: SparkSession) => {
logWarning(
s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated and " +
s"will be ignored. Tungsten will continue to be used.")
@@ -91,7 +91,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
(keyValueOutput, runFunc)
case Some((SQLConf.Deprecated.CODEGEN_ENABLED, Some(value))) =>
- val runFunc = (sqlContext: SQLContext) => {
+ val runFunc = (sparkSession: SparkSession) => {
logWarning(
s"Property ${SQLConf.Deprecated.CODEGEN_ENABLED} is deprecated and " +
s"will be ignored. Codegen will continue to be used.")
@@ -100,7 +100,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
(keyValueOutput, runFunc)
case Some((SQLConf.Deprecated.UNSAFE_ENABLED, Some(value))) =>
- val runFunc = (sqlContext: SQLContext) => {
+ val runFunc = (sparkSession: SparkSession) => {
logWarning(
s"Property ${SQLConf.Deprecated.UNSAFE_ENABLED} is deprecated and " +
s"will be ignored. Unsafe mode will continue to be used.")
@@ -109,7 +109,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
(keyValueOutput, runFunc)
case Some((SQLConf.Deprecated.SORTMERGE_JOIN, Some(value))) =>
- val runFunc = (sqlContext: SQLContext) => {
+ val runFunc = (sparkSession: SparkSession) => {
logWarning(
s"Property ${SQLConf.Deprecated.SORTMERGE_JOIN} is deprecated and " +
s"will be ignored. Sort merge join will continue to be used.")
@@ -118,7 +118,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
(keyValueOutput, runFunc)
case Some((SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED, Some(value))) =>
- val runFunc = (sqlContext: SQLContext) => {
+ val runFunc = (sparkSession: SparkSession) => {
logWarning(
s"Property ${SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED} is " +
s"deprecated and will be ignored. Vectorized parquet reader will be used instead.")
@@ -128,25 +128,25 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
// Configures a single property.
case Some((key, Some(value))) =>
- val runFunc = (sqlContext: SQLContext) => {
- sqlContext.setConf(key, value)
+ val runFunc = (sparkSession: SparkSession) => {
+ sparkSession.setConf(key, value)
Seq(Row(key, value))
}
(keyValueOutput, runFunc)
// (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.)
- // Queries all key-value pairs that are set in the SQLConf of the sqlContext.
+ // Queries all key-value pairs that are set in the SQLConf of the sparkSession.
case None =>
- val runFunc = (sqlContext: SQLContext) => {
- sqlContext.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq
+ val runFunc = (sparkSession: SparkSession) => {
+ sparkSession.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq
}
(keyValueOutput, runFunc)
// Queries all properties along with their default values and docs that are defined in the
- // SQLConf of the sqlContext.
+ // SQLConf of the sparkSession.
case Some(("-v", None)) =>
- val runFunc = (sqlContext: SQLContext) => {
- sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) =>
+ val runFunc = (sparkSession: SparkSession) => {
+ sparkSession.sessionState.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) =>
Row(key, defaultValue, doc)
}
}
@@ -158,19 +158,21 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
// Queries the deprecated "mapred.reduce.tasks" property.
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
- val runFunc = (sqlContext: SQLContext) => {
+ val runFunc = (sparkSession: SparkSession) => {
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
- Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, sqlContext.conf.numShufflePartitions.toString))
+ Seq(Row(
+ SQLConf.SHUFFLE_PARTITIONS.key,
+ sparkSession.sessionState.conf.numShufflePartitions.toString))
}
(keyValueOutput, runFunc)
// Queries a single property.
case Some((key, None)) =>
- val runFunc = (sqlContext: SQLContext) => {
+ val runFunc = (sparkSession: SparkSession) => {
val value =
- try sqlContext.getConf(key) catch {
+ try sparkSession.getConf(key) catch {
case _: NoSuchElementException => "<undefined>"
}
Seq(Row(key, value))
@@ -180,6 +182,6 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
override val output: Seq[Attribute] = _output
- override def run(sqlContext: SQLContext): Seq[Row] = runFunc(sqlContext)
+ override def run(sparkSession: SparkSession): Seq[Row] = runFunc(sparkSession)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 5be5d0c..c283bd6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.{Dataset, Row, SQLContext}
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -28,15 +28,15 @@ case class CacheTableCommand(
isLazy: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
plan.foreach { logicalPlan =>
- sqlContext.registerDataFrameAsTable(Dataset.ofRows(sqlContext, logicalPlan), tableName)
+ sparkSession.registerDataFrameAsTable(Dataset.ofRows(sparkSession, logicalPlan), tableName)
}
- sqlContext.cacheTable(tableName)
+ sparkSession.cacheTable(tableName)
if (!isLazy) {
// Performs eager caching
- sqlContext.table(tableName).count()
+ sparkSession.table(tableName).count()
}
Seq.empty[Row]
@@ -48,8 +48,8 @@ case class CacheTableCommand(
case class UncacheTableCommand(tableName: String) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.table(tableName).unpersist(blocking = false)
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.table(tableName).unpersist(blocking = false)
Seq.empty[Row]
}
@@ -61,8 +61,8 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
*/
case object ClearCacheCommand extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.clearCache()
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.clearCache()
Seq.empty[Row]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 0fd7fa9..7bb59b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -18,8 +18,8 @@
package org.apache.spark.sql.execution.command
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Dataset, Row, SQLContext}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical
@@ -35,7 +35,7 @@ import org.apache.spark.sql.types._
private[sql] trait RunnableCommand extends LogicalPlan with logical.Command {
override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
- def run(sqlContext: SQLContext): Seq[Row]
+ def run(sparkSession: SparkSession): Seq[Row]
}
/**
@@ -54,7 +54,7 @@ private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkP
*/
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
- cmd.run(sqlContext).map(converter(_).asInstanceOf[InternalRow])
+ cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
}
override def output: Seq[Attribute] = cmd.output
@@ -97,8 +97,8 @@ case class ExplainCommand(
extends RunnableCommand {
// Run through the optimizer to generate the physical plan.
- override def run(sqlContext: SQLContext): Seq[Row] = try {
- val queryExecution = sqlContext.executePlan(logicalPlan)
+ override def run(sparkSession: SparkSession): Seq[Row] = try {
+ val queryExecution = sparkSession.executePlan(logicalPlan)
val outputString =
if (codegen) {
codegenString(queryExecution.executedPlan)
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 0ef1d1d..31900b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -55,7 +55,7 @@ case class CreateDataSourceTableCommand(
managedIfNoPath: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
// Since we are saving metadata to metastore, we need to check if metastore supports
// the table name and database name we have for this query. MetaStoreUtils.validateName
// is the method used by Hive to check if a table name or a database name is valid for
@@ -72,7 +72,7 @@ case class CreateDataSourceTableCommand(
}
val tableName = tableIdent.unquotedString
- val sessionState = sqlContext.sessionState
+ val sessionState = sparkSession.sessionState
if (sessionState.catalog.tableExists(tableIdent)) {
if (ignoreIfExists) {
@@ -93,14 +93,14 @@ case class CreateDataSourceTableCommand(
// Create the relation to validate the arguments before writing the metadata to the metastore.
DataSource(
- sqlContext = sqlContext,
+ sparkSession = sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
bucketSpec = None,
options = optionsWithPath).resolveRelation()
CreateDataSourceTableUtils.createDataSourceTable(
- sqlContext = sqlContext,
+ sparkSession = sparkSession,
tableIdent = tableIdent,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = Array.empty[String],
@@ -136,7 +136,7 @@ case class CreateDataSourceTableAsSelectCommand(
query: LogicalPlan)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
// Since we are saving metadata to metastore, we need to check if metastore supports
// the table name and database name we have for this query. MetaStoreUtils.validateName
// is the method used by Hive to check if a table name or a database name is valid for
@@ -153,7 +153,7 @@ case class CreateDataSourceTableAsSelectCommand(
}
val tableName = tableIdent.unquotedString
- val sessionState = sqlContext.sessionState
+ val sessionState = sparkSession.sessionState
var createMetastoreTable = false
var isExternal = true
val optionsWithPath =
@@ -165,7 +165,7 @@ case class CreateDataSourceTableAsSelectCommand(
}
var existingSchema = None: Option[StructType]
- if (sqlContext.sessionState.catalog.tableExists(tableIdent)) {
+ if (sparkSession.sessionState.catalog.tableExists(tableIdent)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
@@ -180,7 +180,7 @@ case class CreateDataSourceTableAsSelectCommand(
case SaveMode.Append =>
// Check if the specified data source match the data source of the existing table.
val dataSource = DataSource(
- sqlContext = sqlContext,
+ sparkSession = sparkSession,
userSpecifiedSchema = Some(query.schema.asNullable),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
@@ -197,7 +197,7 @@ case class CreateDataSourceTableAsSelectCommand(
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
case SaveMode.Overwrite =>
- sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
+ sparkSession.sql(s"DROP TABLE IF EXISTS $tableName")
// Need to create the table again.
createMetastoreTable = true
}
@@ -206,7 +206,7 @@ case class CreateDataSourceTableAsSelectCommand(
createMetastoreTable = true
}
- val data = Dataset.ofRows(sqlContext, query)
+ val data = Dataset.ofRows(sparkSession, query)
val df = existingSchema match {
// If we are inserting into an existing table, just use the existing schema.
case Some(s) => data.selectExpr(s.fieldNames: _*)
@@ -215,7 +215,7 @@ case class CreateDataSourceTableAsSelectCommand(
// Create the relation based on the data of df.
val dataSource = DataSource(
- sqlContext,
+ sparkSession,
className = provider,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
@@ -228,7 +228,7 @@ case class CreateDataSourceTableAsSelectCommand(
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
CreateDataSourceTableUtils.createDataSourceTable(
- sqlContext = sqlContext,
+ sparkSession = sparkSession,
tableIdent = tableIdent,
userSpecifiedSchema = Some(result.schema),
partitionColumns = partitionColumns,
@@ -260,7 +260,7 @@ object CreateDataSourceTableUtils extends Logging {
}
def createDataSourceTable(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
partitionColumns: Array[String],
@@ -275,7 +275,7 @@ object CreateDataSourceTableUtils extends Logging {
// stored into a single metastore SerDe property. In this case, we split the JSON string and
// store each part as a separate SerDe property.
userSpecifiedSchema.foreach { schema =>
- val threshold = sqlContext.sessionState.conf.schemaStringLengthThreshold
+ val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
@@ -329,10 +329,10 @@ object CreateDataSourceTableUtils extends Logging {
CatalogTableType.MANAGED_TABLE
}
- val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sqlContext.sessionState.conf)
+ val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sparkSession.sessionState.conf)
val dataSource =
DataSource(
- sqlContext,
+ sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
@@ -432,7 +432,7 @@ object CreateDataSourceTableUtils extends Logging {
// specific way.
try {
logInfo(message)
- sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false)
+ sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
} catch {
case NonFatal(e) =>
val warningMessage =
@@ -440,13 +440,13 @@ object CreateDataSourceTableUtils extends Logging {
s"it into Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
val table = newSparkSQLSpecificMetastoreTable()
- sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false)
+ sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
}
case (None, message) =>
logWarning(message)
val table = newSparkSQLSpecificMetastoreTable()
- sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false)
+ sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
index 33cc10d..cefe0f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.StringType
@@ -38,10 +38,10 @@ case class ShowDatabasesCommand(databasePattern: Option[String]) extends Runnabl
AttributeReference("result", StringType, nullable = false)() :: Nil
}
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
val databases =
- databasePattern.map(catalog.listDatabases(_)).getOrElse(catalog.listDatabases())
+ databasePattern.map(catalog.listDatabases).getOrElse(catalog.listDatabases())
databases.map { d => Row(d) }
}
}
@@ -55,8 +55,8 @@ case class ShowDatabasesCommand(databasePattern: Option[String]) extends Runnabl
*/
case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.sessionState.catalog.setCurrentDatabase(databaseName)
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.sessionState.catalog.setCurrentDatabase(databaseName)
Seq.empty[Row]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 85f0066..f5aa8fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import scala.util.control.NonFatal
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
@@ -38,8 +38,8 @@ import org.apache.spark.sql.types._
*/
abstract class NativeDDLCommand(val sql: String) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.runNativeSql(sql)
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.runNativeSql(sql)
}
override val output: Seq[Attribute] = {
@@ -66,8 +66,8 @@ case class CreateDatabase(
props: Map[String, String])
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
catalog.createDatabase(
CatalogDatabase(
databaseName,
@@ -104,8 +104,8 @@ case class DropDatabase(
cascade: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade)
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade)
Seq.empty[Row]
}
@@ -126,8 +126,8 @@ case class AlterDatabaseProperties(
props: Map[String, String])
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
val db: CatalogDatabase = catalog.getDatabaseMetadata(databaseName)
catalog.alterDatabase(db.copy(properties = db.properties ++ props))
@@ -152,9 +152,9 @@ case class DescribeDatabase(
extended: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
val dbMetadata: CatalogDatabase =
- sqlContext.sessionState.catalog.getDatabaseMetadata(databaseName)
+ sparkSession.sessionState.catalog.getDatabaseMetadata(databaseName)
val result =
Row("Database Name", dbMetadata.name) ::
Row("Description", dbMetadata.description) ::
@@ -193,8 +193,8 @@ case class DropTable(
ifExists: Boolean,
isView: Boolean) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
if (!catalog.tableExists(tableName)) {
if (!ifExists) {
val objectName = if (isView) "View" else "Table"
@@ -213,7 +213,7 @@ case class DropTable(
case _ =>
})
try {
- sqlContext.cacheManager.tryUncacheQuery(sqlContext.table(tableName.quotedString))
+ sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(tableName.quotedString))
} catch {
case NonFatal(e) => log.warn(s"${e.getMessage}", e)
}
@@ -239,8 +239,8 @@ case class AlterTableSetProperties(
isView: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
DDLUtils.verifyAlterTableType(catalog, tableName, isView)
val table = catalog.getTableMetadata(tableName)
val newProperties = table.properties ++ properties
@@ -271,8 +271,8 @@ case class AlterTableUnsetProperties(
isView: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
DDLUtils.verifyAlterTableType(catalog, tableName, isView)
val table = catalog.getTableMetadata(tableName)
if (DDLUtils.isDatasourceTable(table)) {
@@ -315,8 +315,8 @@ case class AlterTableSerDeProperties(
require(serdeClassName.isDefined || serdeProperties.isDefined,
"alter table attempted to set neither serde class name nor serde properties")
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
// Do not support setting serde for datasource tables
if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
@@ -350,8 +350,8 @@ case class AlterTableAddPartition(
ifNotExists: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
@@ -381,8 +381,8 @@ case class AlterTableRenamePartition(
newPartition: TablePartitionSpec)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.sessionState.catalog.renamePartitions(
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.sessionState.catalog.renamePartitions(
tableName, Seq(oldPartition), Seq(newPartition))
Seq.empty[Row]
}
@@ -409,8 +409,8 @@ case class AlterTableDropPartition(
ifExists: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
@@ -446,8 +446,8 @@ case class AlterTableSetLocation(
location: String)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
partitionSpec match {
case Some(spec) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index 89ccacd..5aa779d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo}
@@ -47,8 +47,8 @@ case class CreateFunction(
isTemp: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
if (isTemp) {
if (databaseName.isDefined) {
throw new AnalysisException(
@@ -99,7 +99,7 @@ case class DescribeFunction(
}
}
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
// Hard code "<>", "!=", "between", and "case" for now as there is no corresponding functions.
functionName.toLowerCase match {
case "<>" =>
@@ -116,7 +116,7 @@ case class DescribeFunction(
Row(s"Function: case") ::
Row(s"Usage: CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END - " +
s"When a = b, returns c; when a = d, return e; else return f") :: Nil
- case _ => sqlContext.sessionState.functionRegistry.lookupFunction(functionName) match {
+ case _ => sparkSession.sessionState.functionRegistry.lookupFunction(functionName) match {
case Some(info) =>
val result =
Row(s"Function: ${info.getName}") ::
@@ -149,8 +149,8 @@ case class DropFunction(
isTemp: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
if (isTemp) {
if (databaseName.isDefined) {
throw new AnalysisException(
@@ -187,12 +187,12 @@ case class ShowFunctions(db: Option[String], pattern: Option[String]) extends Ru
schema.toAttributes
}
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val dbName = db.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val dbName = db.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase)
// If pattern is not specified, we use '*', which is used to
// match any sequence of characters (including no characters).
val functionNames =
- sqlContext.sessionState.catalog
+ sparkSession.sessionState.catalog
.listFunctions(dbName, pattern.getOrElse("*"))
.map(_.unquotedString)
// The session catalog caches some persistent functions in the FunctionRegistry
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
index fc7ecb1..29bcb30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -31,8 +31,8 @@ case class AddJar(path: String) extends RunnableCommand {
schema.toAttributes
}
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.sessionState.addJar(path)
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.sessionState.addJar(path)
Seq(Row(0))
}
}
@@ -41,8 +41,8 @@ case class AddJar(path: String) extends RunnableCommand {
* Adds a file to the current session so it can be used.
*/
case class AddFile(path: String) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.sparkContext.addFile(path)
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.sparkContext.addFile(path)
Seq.empty[Row]
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 5cac9d8..700a704 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -22,7 +22,7 @@ import java.net.URI
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, ExternalCatalog}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -60,8 +60,8 @@ case class CreateTableLike(
sourceTable: TableIdentifier,
ifNotExists: Boolean) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
if (!catalog.tableExists(sourceTable)) {
throw new AnalysisException(
s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
@@ -109,8 +109,8 @@ case class CreateTableLike(
*/
case class CreateTable(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.sessionState.catalog.createTable(table, ifNotExists)
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.sessionState.catalog.createTable(table, ifNotExists)
Seq.empty[Row]
}
@@ -132,8 +132,8 @@ case class AlterTableRename(
isView: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
DDLUtils.verifyAlterTableType(catalog, oldName, isView)
catalog.invalidateTable(oldName)
catalog.renameTable(oldName, newName)
@@ -158,8 +158,8 @@ case class LoadData(
isOverwrite: Boolean,
partition: Option[ExternalCatalog.TablePartitionSpec]) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
if (!catalog.tableExists(table)) {
throw new AnalysisException(
s"Table in LOAD DATA does not exist: '$table'")
@@ -210,7 +210,7 @@ case class LoadData(
// Follow Hive's behavior:
// If no schema or authority is provided with non-local inpath,
// we will use hadoop configuration "fs.default.name".
- val defaultFSConf = sqlContext.sessionState.hadoopConf.get("fs.default.name")
+ val defaultFSConf = sparkSession.sessionState.hadoopConf.get("fs.default.name")
val defaultFS = if (defaultFSConf == null) {
new URI("")
} else {
@@ -285,9 +285,9 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean)
new MetadataBuilder().putString("comment", "comment of the column").build())()
)
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
val result = new ArrayBuffer[Row]
- sqlContext.sessionState.catalog.lookupRelation(table) match {
+ sparkSession.sessionState.catalog.lookupRelation(table) match {
case catalogRelation: CatalogRelation =>
catalogRelation.catalogTable.schema.foreach { column =>
result += Row(column.name, column.dataType, column.comment.orNull)
@@ -333,10 +333,10 @@ case class ShowTablesCommand(
AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil
}
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
// Since we need to return a Seq of rows, we will call getTables directly
- // instead of calling tables in sqlContext.
- val catalog = sqlContext.sessionState.catalog
+ // instead of calling tables in sparkSession.
+ val catalog = sparkSession.sessionState.catalog
val db = databaseName.getOrElse(catalog.getCurrentDatabase)
val tables =
tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db))
@@ -368,13 +368,13 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio
}
}
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val catalog = sqlContext.sessionState.catalog
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
if (catalog.isTemporaryTable(table)) {
Seq.empty[Row]
} else {
- val catalogTable = sqlContext.sessionState.catalog.getTableMetadata(table)
+ val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(table)
propertyKey match {
case Some(p) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 07cc4a9..f42b56f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command
import scala.util.control.NonFatal
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.SQLBuilder
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
@@ -62,14 +62,14 @@ case class CreateViewCommand(
"It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
}
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
- val qe = sqlContext.executePlan(child)
+ val qe = sparkSession.executePlan(child)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length)
- val sessionState = sqlContext.sessionState
+ val sessionState = sparkSession.sessionState
if (sessionState.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
@@ -77,7 +77,7 @@ case class CreateViewCommand(
// already exists.
} else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
- sessionState.catalog.alterTable(prepareTable(sqlContext, analyzedPlan))
+ sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
} else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
@@ -88,7 +88,7 @@ case class CreateViewCommand(
} else {
// Create the view if it doesn't exist.
sessionState.catalog.createTable(
- prepareTable(sqlContext, analyzedPlan), ignoreIfExists = false)
+ prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
}
Seq.empty[Row]
@@ -98,9 +98,9 @@ case class CreateViewCommand(
* Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
* SQL based on the analyzed plan, and also creates the proper schema for the view.
*/
- private def prepareTable(sqlContext: SQLContext, analyzedPlan: LogicalPlan): CatalogTable = {
+ private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
val viewSQL: String =
- if (sqlContext.conf.canonicalView) {
+ if (sparkSession.sessionState.conf.canonicalView) {
val logicalPlan =
if (tableDesc.schema.isEmpty) {
analyzedPlan
@@ -108,7 +108,7 @@ case class CreateViewCommand(
val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
case (attr, col) => Alias(attr, col.name)()
}
- sqlContext.executePlan(Project(projectList, analyzedPlan)).analyzed
+ sparkSession.executePlan(Project(projectList, analyzedPlan)).analyzed
}
new SQLBuilder(logicalPlan).toSQL
} else {
@@ -134,7 +134,7 @@ case class CreateViewCommand(
// Validate the view SQL - make sure we can parse it and analyze it.
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
try {
- sqlContext.sql(viewSQL).queryExecution.assertAnalyzed()
+ sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
} catch {
case NonFatal(e) =>
throw new RuntimeException(
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 4e7214c..ef626ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -59,7 +59,7 @@ import org.apache.spark.util.Utils
* @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
*/
case class DataSource(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
className: String,
paths: Seq[String] = Nil,
userSpecifiedSchema: Option[StructType] = None,
@@ -131,15 +131,15 @@ case class DataSource(
val allPaths = caseInsensitiveOptions.get("path")
val globbedPaths = allPaths.toSeq.flatMap { path =>
val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
+ val fs = hdfsPath.getFileSystem(sparkSession.sessionState.hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
- val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None)
+ val fileCatalog: FileCatalog = new HDFSFileCatalog(sparkSession, options, globbedPaths, None)
userSpecifiedSchema.orElse {
format.inferSchema(
- sqlContext,
+ sparkSession,
caseInsensitiveOptions,
fileCatalog.allFiles())
}.getOrElse {
@@ -151,7 +151,8 @@ case class DataSource(
private def sourceSchema(): SourceInfo = {
providingClass.newInstance() match {
case s: StreamSourceProvider =>
- val (name, schema) = s.sourceSchema(sqlContext, userSpecifiedSchema, className, options)
+ val (name, schema) = s.sourceSchema(
+ sparkSession.wrapped, userSpecifiedSchema, className, options)
SourceInfo(name, schema)
case format: FileFormat =>
@@ -171,7 +172,7 @@ case class DataSource(
def createSource(metadataPath: String): Source = {
providingClass.newInstance() match {
case s: StreamSourceProvider =>
- s.createSource(sqlContext, metadataPath, userSpecifiedSchema, className, options)
+ s.createSource(sparkSession.wrapped, metadataPath, userSpecifiedSchema, className, options)
case format: FileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
@@ -183,16 +184,16 @@ case class DataSource(
val newOptions = options.filterKeys(_ != "path") + ("basePath" -> path)
val newDataSource =
DataSource(
- sqlContext,
+ sparkSession,
paths = files,
userSpecifiedSchema = Some(sourceInfo.schema),
className = className,
options = new CaseInsensitiveMap(newOptions))
- Dataset.ofRows(sqlContext, LogicalRelation(newDataSource.resolveRelation()))
+ Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
}
new FileStreamSource(
- sqlContext, metadataPath, path, sourceInfo.schema, dataFrameBuilder)
+ sparkSession, metadataPath, path, sourceInfo.schema, dataFrameBuilder)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed reading")
@@ -202,14 +203,14 @@ case class DataSource(
/** Returns a sink that can be used to continually write data. */
def createSink(): Sink = {
providingClass.newInstance() match {
- case s: StreamSinkProvider => s.createSink(sqlContext, options, partitionColumns)
+ case s: StreamSinkProvider => s.createSink(sparkSession.wrapped, options, partitionColumns)
case format: FileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
- new FileStreamSink(sqlContext, path, format)
+ new FileStreamSink(sparkSession, path, format)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed writing")
@@ -225,7 +226,7 @@ case class DataSource(
case Seq(singlePath) =>
try {
val hdfsPath = new Path(singlePath)
- val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
+ val fs = hdfsPath.getFileSystem(sparkSession.sessionState.hadoopConf)
val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
val res = fs.exists(metadataPath)
res
@@ -244,9 +245,9 @@ case class DataSource(
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
case (dataSource: SchemaRelationProvider, Some(schema)) =>
- dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema)
+ dataSource.createRelation(sparkSession.wrapped, caseInsensitiveOptions, schema)
case (dataSource: RelationProvider, None) =>
- dataSource.createRelation(sqlContext, caseInsensitiveOptions)
+ dataSource.createRelation(sparkSession.wrapped, caseInsensitiveOptions)
case (_: SchemaRelationProvider, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $className.")
case (_: RelationProvider, Some(_)) =>
@@ -257,11 +258,10 @@ case class DataSource(
case (format: FileFormat, _)
if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
- val fileCatalog =
- new StreamFileCatalog(sqlContext, basePath)
+ val fileCatalog = new StreamFileCatalog(sparkSession, basePath)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
- sqlContext,
+ sparkSession,
caseInsensitiveOptions,
fileCatalog.allFiles())
}.getOrElse {
@@ -271,7 +271,7 @@ case class DataSource(
}
HadoopFsRelation(
- sqlContext,
+ sparkSession,
fileCatalog,
partitionSchema = fileCatalog.partitionSpec().partitionColumns,
dataSchema = dataSchema,
@@ -284,7 +284,7 @@ case class DataSource(
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val globbedPaths = allPaths.flatMap { path =>
val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
+ val fs = hdfsPath.getFileSystem(sparkSession.sessionState.hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
@@ -311,11 +311,11 @@ case class DataSource(
}
val fileCatalog: FileCatalog =
- new HDFSFileCatalog(sqlContext, options, globbedPaths, partitionSchema)
+ new HDFSFileCatalog(sparkSession, options, globbedPaths, partitionSchema)
val dataSchema = userSpecifiedSchema.map { schema =>
val equality =
- if (sqlContext.conf.caseSensitiveAnalysis) {
+ if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
} else {
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
@@ -324,7 +324,7 @@ case class DataSource(
StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
}.orElse {
format.inferSchema(
- sqlContext,
+ sparkSession,
caseInsensitiveOptions,
fileCatalog.allFiles())
}.getOrElse {
@@ -334,10 +334,10 @@ case class DataSource(
}
val enrichedOptions =
- format.prepareRead(sqlContext, caseInsensitiveOptions, fileCatalog.allFiles())
+ format.prepareRead(sparkSession, caseInsensitiveOptions, fileCatalog.allFiles())
HadoopFsRelation(
- sqlContext,
+ sparkSession,
fileCatalog,
partitionSchema = fileCatalog.partitionSpec().partitionColumns,
dataSchema = dataSchema.asNullable,
@@ -363,7 +363,7 @@ case class DataSource(
providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
- dataSource.createRelation(sqlContext, mode, options, data)
+ dataSource.createRelation(sparkSession.wrapped, mode, options, data)
case format: FileFormat =>
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
@@ -374,11 +374,11 @@ case class DataSource(
val path = new Path(caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
}))
- val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
+ val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf)
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
- val caseSensitive = sqlContext.conf.caseSensitiveAnalysis
+ val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumnDataTypes(
data.schema, partitionColumns, caseSensitive)
@@ -421,7 +421,7 @@ case class DataSource(
options,
data.logicalPlan,
mode)
- sqlContext.executePlan(plan).toRdd
+ sparkSession.executePlan(plan).toRdd
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 60238bd..f7f68b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable
import org.apache.spark.{Partition => RDDPartition, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{InputFileNameHolder, RDD}
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.vectorized.ColumnarBatch
@@ -51,10 +51,10 @@ case class PartitionedFile(
case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends RDDPartition
class FileScanRDD(
- @transient val sqlContext: SQLContext,
+ @transient private val sparkSession: SparkSession,
readFunction: (PartitionedFile) => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition])
- extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
+ extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 751daa0..9e1308b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -74,14 +74,14 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
}
val partitionColumns =
- l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)
+ l.resolve(files.partitionSchema, files.sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
val dataColumns =
- l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)
+ l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver)
// Partition keys are not available in the statistics of the files.
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
@@ -107,7 +107,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
val readFile = files.fileFormat.buildReader(
- sqlContext = files.sqlContext,
+ sparkSession = files.sparkSession,
dataSchema = files.dataSchema,
partitionSchema = files.partitionSchema,
requiredSchema = prunedDataSchema,
@@ -115,7 +115,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
options = files.options)
val plannedPartitions = files.bucketSpec match {
- case Some(bucketing) if files.sqlContext.conf.bucketingEnabled =>
+ case Some(bucketing) if files.sparkSession.sessionState.conf.bucketingEnabled =>
logInfo(s"Planning with ${bucketing.numBuckets} buckets")
val bucketed =
selectedPartitions.flatMap { p =>
@@ -134,9 +134,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
}
case _ =>
- val defaultMaxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
- val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes
- val defaultParallelism = files.sqlContext.sparkContext.defaultParallelism
+ val defaultMaxSplitBytes = files.sparkSession.sessionState.conf.filesMaxPartitionBytes
+ val openCostInBytes = files.sparkSession.sessionState.conf.filesOpenCostInBytes
+ val defaultParallelism = files.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes,
@@ -195,7 +195,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
DataSourceScanExec.create(
readDataColumns ++ partitionColumns,
new FileScanRDD(
- files.sqlContext,
+ files.sparkSession,
readFile,
plannedPartitions),
files,
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
index 37c2c45..7b15e49 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
@@ -32,15 +32,15 @@ private[sql] case class InsertIntoDataSource(
overwrite: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
- val data = Dataset.ofRows(sqlContext, query)
+ val data = Dataset.ofRows(sparkSession, query)
// Apply the schema of the existing table to the new data.
- val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
+ val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
relation.insert(df, overwrite)
// Invalidate the cache.
- sqlContext.cacheManager.invalidateCache(logicalRelation)
+ sparkSession.cacheManager.invalidateCache(logicalRelation)
Seq.empty[Row]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index a636ca2..b2483e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -68,7 +68,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
override def children: Seq[LogicalPlan] = query :: Nil
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
// Most formats don't do well with duplicate columns, so lets not allow that
if (query.schema.fieldNames.length != query.schema.fieldNames.distinct.length) {
val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect {
@@ -78,7 +78,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
s"cannot save to file.")
}
- val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf)
+ val hadoopConf = new Configuration(sparkSession.sessionState.hadoopConf)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
@@ -111,14 +111,14 @@ private[sql] case class InsertIntoHadoopFsRelation(
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = query.output.filterNot(partitionSet.contains)
- val queryExecution = Dataset.ofRows(sqlContext, query).queryExecution
- SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
+ val queryExecution = Dataset.ofRows(sparkSession, query).queryExecution
+ SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
val relation =
WriteRelation(
- sqlContext,
+ sparkSession,
dataColumns.toStructType,
qualifiedOutputPath.toString,
- fileFormat.prepareWrite(sqlContext, _, options, dataColumns.toStructType),
+ fileFormat.prepareWrite(sparkSession, _, options, dataColumns.toStructType),
bucketSpec)
val writerContainer = if (partitionColumns.isEmpty && bucketSpec.isEmpty) {
@@ -131,7 +131,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
dataColumns = dataColumns,
inputSchema = query.output,
PartitioningUtils.DEFAULT_PARTITION_NAME,
- sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES),
+ sparkSession.getConf(SQLConf.PARTITION_MAX_FILES),
isAppend)
}
@@ -140,7 +140,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
writerContainer.driverSideSetup()
try {
- sqlContext.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _)
+ sparkSession.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _)
writerContainer.commitJob()
refreshFunction()
} catch { case cause: Throwable =>
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index b9527db..3b064a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.InternalRow
@@ -36,9 +36,10 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
+
/** A container for all the details required when writing to a table. */
case class WriteRelation(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
dataSchema: StructType,
path: String,
prepareJobForWrite: Job => OutputWriterFactory,
@@ -66,7 +67,7 @@ private[sql] abstract class BaseWriterContainer(
@transient private val jobContext: JobContext = job
private val speculationEnabled: Boolean =
- relation.sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
+ relation.sparkSession.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
// The following fields are initialized and used on both driver and executor side.
@transient protected var outputCommitter: OutputCommitter = _
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 7d407a7..fb047ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce._
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
@@ -49,14 +49,14 @@ class DefaultSource extends FileFormat with DataSourceRegister {
override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
override def inferSchema(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val csvOptions = new CSVOptions(options)
// TODO: Move filtering.
val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString)
- val rdd = baseRdd(sqlContext, csvOptions, paths)
+ val rdd = baseRdd(sparkSession, csvOptions, paths)
val firstLine = findFirstLine(csvOptions, rdd)
val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine)
@@ -66,7 +66,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
firstRow.zipWithIndex.map { case (value, index) => s"C$index" }
}
- val parsedRdd = tokenRdd(sqlContext, csvOptions, header, paths)
+ val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths)
val schema = if (csvOptions.inferSchemaFlag) {
CSVInferSchema.infer(parsedRdd, header, csvOptions.nullValue)
} else {
@@ -80,7 +80,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
override def prepareWrite(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
@@ -94,7 +94,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
override def buildReader(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
@@ -103,8 +103,8 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val csvOptions = new CSVOptions(options)
val headers = requiredSchema.fields.map(_.name)
- val conf = new Configuration(sqlContext.sessionState.hadoopConf)
- val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+ val conf = new Configuration(sparkSession.sessionState.hadoopConf)
+ val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
(file: PartitionedFile) => {
val lineIterator = {
@@ -134,18 +134,18 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
private def baseRdd(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
options: CSVOptions,
inputPaths: Seq[String]): RDD[String] = {
- readText(sqlContext, options, inputPaths.mkString(","))
+ readText(sparkSession, options, inputPaths.mkString(","))
}
private def tokenRdd(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
options: CSVOptions,
header: Array[String],
inputPaths: Seq[String]): RDD[Array[String]] = {
- val rdd = baseRdd(sqlContext, options, inputPaths)
+ val rdd = baseRdd(sparkSession, options, inputPaths)
// Make sure firstLine is materialized before sending to executors
val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else null
CSVRelation.univocityTokenizer(rdd, header, firstLine, options)
@@ -168,14 +168,14 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
private def readText(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
options: CSVOptions,
location: String): RDD[String] = {
if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
- sqlContext.sparkContext.textFile(location)
+ sparkSession.sparkContext.textFile(location)
} else {
val charset = options.charset
- sqlContext.sparkContext
+ sparkSession.sparkContext
.hadoopFile[LongWritable, Text, TextInputFormat](location)
.mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index e7e94bb..7d0a3d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -74,15 +74,15 @@ case class CreateTempTableUsing(
s"Temporary table '$tableIdent' should not have specified a database")
}
- def run(sqlContext: SQLContext): Seq[Row] = {
+ def run(sparkSession: SparkSession): Seq[Row] = {
val dataSource = DataSource(
- sqlContext,
+ sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
options = options)
- sqlContext.sessionState.catalog.createTempTable(
+ sparkSession.sessionState.catalog.createTempTable(
tableIdent.table,
- Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
+ Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
overrideIfExists = true)
Seq.empty[Row]
@@ -102,18 +102,18 @@ case class CreateTempTableUsingAsSelect(
s"Temporary table '$tableIdent' should not have specified a database")
}
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val df = Dataset.ofRows(sqlContext, query)
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val df = Dataset.ofRows(sparkSession, query)
val dataSource = DataSource(
- sqlContext,
+ sparkSession,
className = provider,
partitionColumns = partitionColumns,
bucketSpec = None,
options = options)
val result = dataSource.write(mode, df)
- sqlContext.sessionState.catalog.createTempTable(
+ sparkSession.sessionState.catalog.createTempTable(
tableIdent.table,
- Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan,
+ Dataset.ofRows(sparkSession, LogicalRelation(result)).logicalPlan,
overrideIfExists = true)
Seq.empty[Row]
@@ -123,23 +123,23 @@ case class CreateTempTableUsingAsSelect(
case class RefreshTable(tableIdent: TableIdentifier)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
// Refresh the given table's metadata first.
- sqlContext.sessionState.catalog.refreshTable(tableIdent)
+ sparkSession.sessionState.catalog.refreshTable(tableIdent)
// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
- val logicalPlan = sqlContext.sessionState.catalog.lookupRelation(tableIdent)
+ val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
- val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+ val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
// Create a data frame to represent the table.
// TODO: Use uncacheTable once it supports database name.
- val df = Dataset.ofRows(sqlContext, logicalPlan)
+ val df = Dataset.ofRows(sparkSession, logicalPlan)
// Uncache the logicalPlan.
- sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
+ sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
// Cache it again.
- sqlContext.cacheManager.cacheQuery(df, Some(tableIdent.table))
+ sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
}
Seq.empty[Row]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org