You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/26 05:54:37 UTC

[3/4] spark git commit: [SPARK-14861][SQL] Replace internal usages of SQLContext with SparkSession

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index bb83676..d3d83b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
 import java.sql.Timestamp
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
@@ -39,39 +39,41 @@ import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampT
  * While this is not a public class, we should avoid changing the function names for the sake of
  * changing them, because a lot of developers use the feature for debugging.
  */
-class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
+class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
 
   // TODO: Move the planner an optimizer into here from SessionState.
-  protected def planner = sqlContext.sessionState.planner
-
-  def assertAnalyzed(): Unit = try sqlContext.sessionState.analyzer.checkAnalysis(analyzed) catch {
-    case e: AnalysisException =>
-      val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed))
-      ae.setStackTrace(e.getStackTrace)
-      throw ae
+  protected def planner = sparkSession.sessionState.planner
+
+  def assertAnalyzed(): Unit = {
+    try sparkSession.sessionState.analyzer.checkAnalysis(analyzed) catch {
+      case e: AnalysisException =>
+        val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed))
+        ae.setStackTrace(e.getStackTrace)
+        throw ae
+    }
   }
 
   def assertSupported(): Unit = {
-    if (sqlContext.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+    if (sparkSession.sessionState.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
       UnsupportedOperationChecker.checkForBatch(analyzed)
     }
   }
 
   lazy val analyzed: LogicalPlan = {
-    SQLContext.setActive(sqlContext)
-    sqlContext.sessionState.analyzer.execute(logical)
+    SQLContext.setActive(sparkSession.wrapped)
+    sparkSession.sessionState.analyzer.execute(logical)
   }
 
   lazy val withCachedData: LogicalPlan = {
     assertAnalyzed()
     assertSupported()
-    sqlContext.cacheManager.useCachedData(analyzed)
+    sparkSession.cacheManager.useCachedData(analyzed)
   }
 
-  lazy val optimizedPlan: LogicalPlan = sqlContext.sessionState.optimizer.execute(withCachedData)
+  lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)
 
   lazy val sparkPlan: SparkPlan = {
-    SQLContext.setActive(sqlContext)
+    SQLContext.setActive(sparkSession.wrapped)
     planner.plan(ReturnAnswer(optimizedPlan)).next()
   }
 
@@ -93,10 +95,10 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
   /** A sequence of rules that will be applied in order to the physical plan before execution. */
   protected def preparations: Seq[Rule[SparkPlan]] = Seq(
     python.ExtractPythonUDFs,
-    PlanSubqueries(sqlContext),
-    EnsureRequirements(sqlContext.conf),
-    CollapseCodegenStages(sqlContext.conf),
-    ReuseExchange(sqlContext.conf))
+    PlanSubqueries(sparkSession),
+    EnsureRequirements(sparkSession.sessionState.conf),
+    CollapseCodegenStages(sparkSession.sessionState.conf),
+    ReuseExchange(sparkSession.sessionState.conf))
 
   protected def stringOrError[A](f: => A): String =
     try f.toString catch { case e: Throwable => e.toString }
@@ -110,7 +112,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
     case ExecutedCommandExec(desc: DescribeTableCommand) =>
       // If it is a describe command for a Hive table, we want to have the output format
       // be similar with Hive.
-      desc.run(sqlContext).map {
+      desc.run(sparkSession).map {
         case Row(name: String, dataType: String, comment) =>
           Seq(name, dataType,
             Option(comment.asInstanceOf[String]).getOrElse(""))

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 0a11b16..397d66b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
 import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd,
   SparkListenerSQLExecutionStart}
 import org.apache.spark.util.Utils
@@ -38,21 +38,22 @@ private[sql] object SQLExecution {
    * we can connect them with an execution.
    */
   def withNewExecutionId[T](
-      sqlContext: SQLContext, queryExecution: QueryExecution)(body: => T): T = {
-    val sc = sqlContext.sparkContext
+      sparkSession: SparkSession,
+      queryExecution: QueryExecution)(body: => T): T = {
+    val sc = sparkSession.sparkContext
     val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
     if (oldExecutionId == null) {
       val executionId = SQLExecution.nextExecutionId
       sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
       val r = try {
         val callSite = Utils.getCallSite()
-        sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
+        sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
           executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
         try {
           body
         } finally {
-          sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
+          sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
             executionId, System.currentTimeMillis()))
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index e28e456..861ff3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -18,12 +18,10 @@
 package org.apache.spark.sql.execution
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
-import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
-import scala.util.control.NonFatal
 
 import org.apache.spark.{broadcast, SparkEnv}
 import org.apache.spark.internal.Logging

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
index e6c5351..b6f7808 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
@@ -21,7 +21,7 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
 
@@ -35,8 +35,8 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
  */
 case class AnalyzeTable(tableName: String) extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val sessionState = sqlContext.sessionState
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val sessionState = sparkSession.sessionState
     val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
     val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
 
@@ -77,7 +77,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
           catalogTable.storage.locationUri.map { p =>
             val path = new Path(p)
             try {
-              val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
+              val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf)
               calculateTableSize(fs, path)
             } catch {
               case NonFatal(e) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
index 39e441f..bf66ea4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.types.StringType
 
@@ -29,7 +29,7 @@ case class HiveNativeCommand(sql: String) extends RunnableCommand {
   override def output: Seq[AttributeReference] =
     Seq(AttributeReference("result", StringType, nullable = false)())
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.sessionState.runNativeSql(sql).map(Row(_))
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.sessionState.runNativeSql(sql).map(Row(_))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index 4daf9e9..952a0d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import java.util.NoSuchElementException
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -43,10 +43,10 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
     schema.toAttributes
   }
 
-  private val (_output, runFunc): (Seq[Attribute], SQLContext => Seq[Row]) = kv match {
+  private val (_output, runFunc): (Seq[Attribute], SparkSession => Seq[Row]) = kv match {
     // Configures the deprecated "mapred.reduce.tasks" property.
     case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
-      val runFunc = (sqlContext: SQLContext) => {
+      val runFunc = (sparkSession: SparkSession) => {
         logWarning(
           s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
             s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
@@ -56,14 +56,14 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
               "determining the number of reducers is not supported."
           throw new IllegalArgumentException(msg)
         } else {
-          sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
+          sparkSession.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
           Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
         }
       }
       (keyValueOutput, runFunc)
 
     case Some((SQLConf.Deprecated.EXTERNAL_SORT, Some(value))) =>
-      val runFunc = (sqlContext: SQLContext) => {
+      val runFunc = (sparkSession: SparkSession) => {
         logWarning(
           s"Property ${SQLConf.Deprecated.EXTERNAL_SORT} is deprecated and will be ignored. " +
             s"External sort will continue to be used.")
@@ -72,7 +72,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
       (keyValueOutput, runFunc)
 
     case Some((SQLConf.Deprecated.USE_SQL_AGGREGATE2, Some(value))) =>
-      val runFunc = (sqlContext: SQLContext) => {
+      val runFunc = (sparkSession: SparkSession) => {
         logWarning(
           s"Property ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} is deprecated and " +
             s"will be ignored. ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} will " +
@@ -82,7 +82,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
       (keyValueOutput, runFunc)
 
     case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
-      val runFunc = (sqlContext: SQLContext) => {
+      val runFunc = (sparkSession: SparkSession) => {
         logWarning(
           s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated and " +
             s"will be ignored. Tungsten will continue to be used.")
@@ -91,7 +91,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
       (keyValueOutput, runFunc)
 
     case Some((SQLConf.Deprecated.CODEGEN_ENABLED, Some(value))) =>
-      val runFunc = (sqlContext: SQLContext) => {
+      val runFunc = (sparkSession: SparkSession) => {
         logWarning(
           s"Property ${SQLConf.Deprecated.CODEGEN_ENABLED} is deprecated and " +
             s"will be ignored. Codegen will continue to be used.")
@@ -100,7 +100,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
       (keyValueOutput, runFunc)
 
     case Some((SQLConf.Deprecated.UNSAFE_ENABLED, Some(value))) =>
-      val runFunc = (sqlContext: SQLContext) => {
+      val runFunc = (sparkSession: SparkSession) => {
         logWarning(
           s"Property ${SQLConf.Deprecated.UNSAFE_ENABLED} is deprecated and " +
             s"will be ignored. Unsafe mode will continue to be used.")
@@ -109,7 +109,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
       (keyValueOutput, runFunc)
 
     case Some((SQLConf.Deprecated.SORTMERGE_JOIN, Some(value))) =>
-      val runFunc = (sqlContext: SQLContext) => {
+      val runFunc = (sparkSession: SparkSession) => {
         logWarning(
           s"Property ${SQLConf.Deprecated.SORTMERGE_JOIN} is deprecated and " +
             s"will be ignored. Sort merge join will continue to be used.")
@@ -118,7 +118,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
       (keyValueOutput, runFunc)
 
     case Some((SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED, Some(value))) =>
-      val runFunc = (sqlContext: SQLContext) => {
+      val runFunc = (sparkSession: SparkSession) => {
         logWarning(
           s"Property ${SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED} is " +
             s"deprecated and will be ignored. Vectorized parquet reader will be used instead.")
@@ -128,25 +128,25 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
 
     // Configures a single property.
     case Some((key, Some(value))) =>
-      val runFunc = (sqlContext: SQLContext) => {
-        sqlContext.setConf(key, value)
+      val runFunc = (sparkSession: SparkSession) => {
+        sparkSession.setConf(key, value)
         Seq(Row(key, value))
       }
       (keyValueOutput, runFunc)
 
     // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.)
-    // Queries all key-value pairs that are set in the SQLConf of the sqlContext.
+    // Queries all key-value pairs that are set in the SQLConf of the sparkSession.
     case None =>
-      val runFunc = (sqlContext: SQLContext) => {
-        sqlContext.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq
+      val runFunc = (sparkSession: SparkSession) => {
+        sparkSession.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq
       }
       (keyValueOutput, runFunc)
 
     // Queries all properties along with their default values and docs that are defined in the
-    // SQLConf of the sqlContext.
+    // SQLConf of the sparkSession.
     case Some(("-v", None)) =>
-      val runFunc = (sqlContext: SQLContext) => {
-        sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) =>
+      val runFunc = (sparkSession: SparkSession) => {
+        sparkSession.sessionState.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) =>
           Row(key, defaultValue, doc)
         }
       }
@@ -158,19 +158,21 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
 
     // Queries the deprecated "mapred.reduce.tasks" property.
     case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
-      val runFunc = (sqlContext: SQLContext) => {
+      val runFunc = (sparkSession: SparkSession) => {
         logWarning(
           s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
             s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
-        Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, sqlContext.conf.numShufflePartitions.toString))
+        Seq(Row(
+          SQLConf.SHUFFLE_PARTITIONS.key,
+          sparkSession.sessionState.conf.numShufflePartitions.toString))
       }
       (keyValueOutput, runFunc)
 
     // Queries a single property.
     case Some((key, None)) =>
-      val runFunc = (sqlContext: SQLContext) => {
+      val runFunc = (sparkSession: SparkSession) => {
         val value =
-          try sqlContext.getConf(key) catch {
+          try sparkSession.getConf(key) catch {
             case _: NoSuchElementException => "<undefined>"
           }
         Seq(Row(key, value))
@@ -180,6 +182,6 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
 
   override val output: Seq[Attribute] = _output
 
-  override def run(sqlContext: SQLContext): Seq[Row] = runFunc(sqlContext)
+  override def run(sparkSession: SparkSession): Seq[Row] = runFunc(sparkSession)
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 5be5d0c..c283bd6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{Dataset, Row, SQLContext}
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
@@ -28,15 +28,15 @@ case class CacheTableCommand(
   isLazy: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     plan.foreach { logicalPlan =>
-      sqlContext.registerDataFrameAsTable(Dataset.ofRows(sqlContext, logicalPlan), tableName)
+      sparkSession.registerDataFrameAsTable(Dataset.ofRows(sparkSession, logicalPlan), tableName)
     }
-    sqlContext.cacheTable(tableName)
+    sparkSession.cacheTable(tableName)
 
     if (!isLazy) {
       // Performs eager caching
-      sqlContext.table(tableName).count()
+      sparkSession.table(tableName).count()
     }
 
     Seq.empty[Row]
@@ -48,8 +48,8 @@ case class CacheTableCommand(
 
 case class UncacheTableCommand(tableName: String) extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.table(tableName).unpersist(blocking = false)
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.table(tableName).unpersist(blocking = false)
     Seq.empty[Row]
   }
 
@@ -61,8 +61,8 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
  */
 case object ClearCacheCommand extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.clearCache()
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.clearCache()
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 0fd7fa9..7bb59b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.sql.execution.command
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Dataset, Row, SQLContext}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical
@@ -35,7 +35,7 @@ import org.apache.spark.sql.types._
 private[sql] trait RunnableCommand extends LogicalPlan with logical.Command {
   override def output: Seq[Attribute] = Seq.empty
   override def children: Seq[LogicalPlan] = Seq.empty
-  def run(sqlContext: SQLContext): Seq[Row]
+  def run(sparkSession: SparkSession): Seq[Row]
 }
 
 /**
@@ -54,7 +54,7 @@ private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkP
    */
   protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
     val converter = CatalystTypeConverters.createToCatalystConverter(schema)
-    cmd.run(sqlContext).map(converter(_).asInstanceOf[InternalRow])
+    cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
   }
 
   override def output: Seq[Attribute] = cmd.output
@@ -97,8 +97,8 @@ case class ExplainCommand(
   extends RunnableCommand {
 
   // Run through the optimizer to generate the physical plan.
-  override def run(sqlContext: SQLContext): Seq[Row] = try {
-    val queryExecution = sqlContext.executePlan(logicalPlan)
+  override def run(sparkSession: SparkSession): Seq[Row] = try {
+    val queryExecution = sparkSession.executePlan(logicalPlan)
     val outputString =
       if (codegen) {
         codegenString(queryExecution.executedPlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 0ef1d1d..31900b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -55,7 +55,7 @@ case class CreateDataSourceTableCommand(
     managedIfNoPath: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     // Since we are saving metadata to metastore, we need to check if metastore supports
     // the table name and database name we have for this query. MetaStoreUtils.validateName
     // is the method used by Hive to check if a table name or a database name is valid for
@@ -72,7 +72,7 @@ case class CreateDataSourceTableCommand(
     }
 
     val tableName = tableIdent.unquotedString
-    val sessionState = sqlContext.sessionState
+    val sessionState = sparkSession.sessionState
 
     if (sessionState.catalog.tableExists(tableIdent)) {
       if (ignoreIfExists) {
@@ -93,14 +93,14 @@ case class CreateDataSourceTableCommand(
 
     // Create the relation to validate the arguments before writing the metadata to the metastore.
     DataSource(
-      sqlContext = sqlContext,
+      sparkSession = sparkSession,
       userSpecifiedSchema = userSpecifiedSchema,
       className = provider,
       bucketSpec = None,
       options = optionsWithPath).resolveRelation()
 
     CreateDataSourceTableUtils.createDataSourceTable(
-      sqlContext = sqlContext,
+      sparkSession = sparkSession,
       tableIdent = tableIdent,
       userSpecifiedSchema = userSpecifiedSchema,
       partitionColumns = Array.empty[String],
@@ -136,7 +136,7 @@ case class CreateDataSourceTableAsSelectCommand(
     query: LogicalPlan)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     // Since we are saving metadata to metastore, we need to check if metastore supports
     // the table name and database name we have for this query. MetaStoreUtils.validateName
     // is the method used by Hive to check if a table name or a database name is valid for
@@ -153,7 +153,7 @@ case class CreateDataSourceTableAsSelectCommand(
     }
 
     val tableName = tableIdent.unquotedString
-    val sessionState = sqlContext.sessionState
+    val sessionState = sparkSession.sessionState
     var createMetastoreTable = false
     var isExternal = true
     val optionsWithPath =
@@ -165,7 +165,7 @@ case class CreateDataSourceTableAsSelectCommand(
       }
 
     var existingSchema = None: Option[StructType]
-    if (sqlContext.sessionState.catalog.tableExists(tableIdent)) {
+    if (sparkSession.sessionState.catalog.tableExists(tableIdent)) {
       // Check if we need to throw an exception or just return.
       mode match {
         case SaveMode.ErrorIfExists =>
@@ -180,7 +180,7 @@ case class CreateDataSourceTableAsSelectCommand(
         case SaveMode.Append =>
           // Check if the specified data source match the data source of the existing table.
           val dataSource = DataSource(
-            sqlContext = sqlContext,
+            sparkSession = sparkSession,
             userSpecifiedSchema = Some(query.schema.asNullable),
             partitionColumns = partitionColumns,
             bucketSpec = bucketSpec,
@@ -197,7 +197,7 @@ case class CreateDataSourceTableAsSelectCommand(
               throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
           }
         case SaveMode.Overwrite =>
-          sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
+          sparkSession.sql(s"DROP TABLE IF EXISTS $tableName")
           // Need to create the table again.
           createMetastoreTable = true
       }
@@ -206,7 +206,7 @@ case class CreateDataSourceTableAsSelectCommand(
       createMetastoreTable = true
     }
 
-    val data = Dataset.ofRows(sqlContext, query)
+    val data = Dataset.ofRows(sparkSession, query)
     val df = existingSchema match {
       // If we are inserting into an existing table, just use the existing schema.
       case Some(s) => data.selectExpr(s.fieldNames: _*)
@@ -215,7 +215,7 @@ case class CreateDataSourceTableAsSelectCommand(
 
     // Create the relation based on the data of df.
     val dataSource = DataSource(
-      sqlContext,
+      sparkSession,
       className = provider,
       partitionColumns = partitionColumns,
       bucketSpec = bucketSpec,
@@ -228,7 +228,7 @@ case class CreateDataSourceTableAsSelectCommand(
       // the schema of df). It is important since the nullability may be changed by the relation
       // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
       CreateDataSourceTableUtils.createDataSourceTable(
-        sqlContext = sqlContext,
+        sparkSession = sparkSession,
         tableIdent = tableIdent,
         userSpecifiedSchema = Some(result.schema),
         partitionColumns = partitionColumns,
@@ -260,7 +260,7 @@ object CreateDataSourceTableUtils extends Logging {
   }
 
   def createDataSourceTable(
-      sqlContext: SQLContext,
+      sparkSession: SparkSession,
       tableIdent: TableIdentifier,
       userSpecifiedSchema: Option[StructType],
       partitionColumns: Array[String],
@@ -275,7 +275,7 @@ object CreateDataSourceTableUtils extends Logging {
     // stored into a single metastore SerDe property.  In this case, we split the JSON string and
     // store each part as a separate SerDe property.
     userSpecifiedSchema.foreach { schema =>
-      val threshold = sqlContext.sessionState.conf.schemaStringLengthThreshold
+      val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
       val schemaJsonString = schema.json
       // Split the JSON string.
       val parts = schemaJsonString.grouped(threshold).toSeq
@@ -329,10 +329,10 @@ object CreateDataSourceTableUtils extends Logging {
       CatalogTableType.MANAGED_TABLE
     }
 
-    val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sqlContext.sessionState.conf)
+    val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sparkSession.sessionState.conf)
     val dataSource =
       DataSource(
-        sqlContext,
+        sparkSession,
         userSpecifiedSchema = userSpecifiedSchema,
         partitionColumns = partitionColumns,
         bucketSpec = bucketSpec,
@@ -432,7 +432,7 @@ object CreateDataSourceTableUtils extends Logging {
         // specific way.
         try {
           logInfo(message)
-          sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false)
+          sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
         } catch {
           case NonFatal(e) =>
             val warningMessage =
@@ -440,13 +440,13 @@ object CreateDataSourceTableUtils extends Logging {
                 s"it into Hive metastore in Spark SQL specific format."
             logWarning(warningMessage, e)
             val table = newSparkSQLSpecificMetastoreTable()
-            sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false)
+            sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
         }
 
       case (None, message) =>
         logWarning(message)
         val table = newSparkSQLSpecificMetastoreTable()
-        sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false)
+        sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
index 33cc10d..cefe0f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.types.StringType
 
@@ -38,10 +38,10 @@ case class ShowDatabasesCommand(databasePattern: Option[String]) extends Runnabl
     AttributeReference("result", StringType, nullable = false)() :: Nil
   }
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     val databases =
-      databasePattern.map(catalog.listDatabases(_)).getOrElse(catalog.listDatabases())
+      databasePattern.map(catalog.listDatabases).getOrElse(catalog.listDatabases())
     databases.map { d => Row(d) }
   }
 }
@@ -55,8 +55,8 @@ case class ShowDatabasesCommand(databasePattern: Option[String]) extends Runnabl
  */
 case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.sessionState.catalog.setCurrentDatabase(databaseName)
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.sessionState.catalog.setCurrentDatabase(databaseName)
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 85f0066..f5aa8fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import scala.util.control.NonFatal
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
@@ -38,8 +38,8 @@ import org.apache.spark.sql.types._
  */
 abstract class NativeDDLCommand(val sql: String) extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.runNativeSql(sql)
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.runNativeSql(sql)
   }
 
   override val output: Seq[Attribute] = {
@@ -66,8 +66,8 @@ case class CreateDatabase(
     props: Map[String, String])
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     catalog.createDatabase(
       CatalogDatabase(
         databaseName,
@@ -104,8 +104,8 @@ case class DropDatabase(
     cascade: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade)
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade)
     Seq.empty[Row]
   }
 
@@ -126,8 +126,8 @@ case class AlterDatabaseProperties(
     props: Map[String, String])
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     val db: CatalogDatabase = catalog.getDatabaseMetadata(databaseName)
     catalog.alterDatabase(db.copy(properties = db.properties ++ props))
 
@@ -152,9 +152,9 @@ case class DescribeDatabase(
     extended: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     val dbMetadata: CatalogDatabase =
-      sqlContext.sessionState.catalog.getDatabaseMetadata(databaseName)
+      sparkSession.sessionState.catalog.getDatabaseMetadata(databaseName)
     val result =
       Row("Database Name", dbMetadata.name) ::
         Row("Description", dbMetadata.description) ::
@@ -193,8 +193,8 @@ case class DropTable(
     ifExists: Boolean,
     isView: Boolean) extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     if (!catalog.tableExists(tableName)) {
       if (!ifExists) {
         val objectName = if (isView) "View" else "Table"
@@ -213,7 +213,7 @@ case class DropTable(
         case _ =>
       })
       try {
-        sqlContext.cacheManager.tryUncacheQuery(sqlContext.table(tableName.quotedString))
+        sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(tableName.quotedString))
       } catch {
         case NonFatal(e) => log.warn(s"${e.getMessage}", e)
       }
@@ -239,8 +239,8 @@ case class AlterTableSetProperties(
     isView: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     DDLUtils.verifyAlterTableType(catalog, tableName, isView)
     val table = catalog.getTableMetadata(tableName)
     val newProperties = table.properties ++ properties
@@ -271,8 +271,8 @@ case class AlterTableUnsetProperties(
     isView: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     DDLUtils.verifyAlterTableType(catalog, tableName, isView)
     val table = catalog.getTableMetadata(tableName)
     if (DDLUtils.isDatasourceTable(table)) {
@@ -315,8 +315,8 @@ case class AlterTableSerDeProperties(
   require(serdeClassName.isDefined || serdeProperties.isDefined,
     "alter table attempted to set neither serde class name nor serde properties")
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
     // Do not support setting serde for datasource tables
     if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
@@ -350,8 +350,8 @@ case class AlterTableAddPartition(
     ifNotExists: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
     if (DDLUtils.isDatasourceTable(table)) {
       throw new AnalysisException(
@@ -381,8 +381,8 @@ case class AlterTableRenamePartition(
     newPartition: TablePartitionSpec)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.sessionState.catalog.renamePartitions(
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.sessionState.catalog.renamePartitions(
       tableName, Seq(oldPartition), Seq(newPartition))
     Seq.empty[Row]
   }
@@ -409,8 +409,8 @@ case class AlterTableDropPartition(
     ifExists: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
     if (DDLUtils.isDatasourceTable(table)) {
       throw new AnalysisException(
@@ -446,8 +446,8 @@ case class AlterTableSetLocation(
     location: String)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
     partitionSpec match {
       case Some(spec) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index 89ccacd..5aa779d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogFunction
 import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo}
@@ -47,8 +47,8 @@ case class CreateFunction(
     isTemp: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     if (isTemp) {
       if (databaseName.isDefined) {
         throw new AnalysisException(
@@ -99,7 +99,7 @@ case class DescribeFunction(
     }
   }
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     // Hard code "<>", "!=", "between", and "case" for now as there is no corresponding functions.
     functionName.toLowerCase match {
       case "<>" =>
@@ -116,7 +116,7 @@ case class DescribeFunction(
         Row(s"Function: case") ::
           Row(s"Usage: CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END - " +
             s"When a = b, returns c; when a = d, return e; else return f") :: Nil
-      case _ => sqlContext.sessionState.functionRegistry.lookupFunction(functionName) match {
+      case _ => sparkSession.sessionState.functionRegistry.lookupFunction(functionName) match {
         case Some(info) =>
           val result =
             Row(s"Function: ${info.getName}") ::
@@ -149,8 +149,8 @@ case class DropFunction(
     isTemp: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     if (isTemp) {
       if (databaseName.isDefined) {
         throw new AnalysisException(
@@ -187,12 +187,12 @@ case class ShowFunctions(db: Option[String], pattern: Option[String]) extends Ru
     schema.toAttributes
   }
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val dbName = db.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val dbName = db.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase)
     // If pattern is not specified, we use '*', which is used to
     // match any sequence of characters (including no characters).
     val functionNames =
-      sqlContext.sessionState.catalog
+      sparkSession.sessionState.catalog
         .listFunctions(dbName, pattern.getOrElse("*"))
         .map(_.unquotedString)
     // The session catalog caches some persistent functions in the FunctionRegistry

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
index fc7ecb1..29bcb30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 
@@ -31,8 +31,8 @@ case class AddJar(path: String) extends RunnableCommand {
     schema.toAttributes
   }
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.sessionState.addJar(path)
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.sessionState.addJar(path)
     Seq(Row(0))
   }
 }
@@ -41,8 +41,8 @@ case class AddJar(path: String) extends RunnableCommand {
  * Adds a file to the current session so it can be used.
  */
 case class AddFile(path: String) extends RunnableCommand {
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.sparkContext.addFile(path)
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.sparkContext.addFile(path)
     Seq.empty[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 5cac9d8..700a704 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -22,7 +22,7 @@ import java.net.URI
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, ExternalCatalog}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -60,8 +60,8 @@ case class CreateTableLike(
     sourceTable: TableIdentifier,
     ifNotExists: Boolean) extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     if (!catalog.tableExists(sourceTable)) {
       throw new AnalysisException(
         s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
@@ -109,8 +109,8 @@ case class CreateTableLike(
  */
 case class CreateTable(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.sessionState.catalog.createTable(table, ifNotExists)
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.sessionState.catalog.createTable(table, ifNotExists)
     Seq.empty[Row]
   }
 
@@ -132,8 +132,8 @@ case class AlterTableRename(
     isView: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     DDLUtils.verifyAlterTableType(catalog, oldName, isView)
     catalog.invalidateTable(oldName)
     catalog.renameTable(oldName, newName)
@@ -158,8 +158,8 @@ case class LoadData(
     isOverwrite: Boolean,
     partition: Option[ExternalCatalog.TablePartitionSpec]) extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
     if (!catalog.tableExists(table)) {
       throw new AnalysisException(
         s"Table in LOAD DATA does not exist: '$table'")
@@ -210,7 +210,7 @@ case class LoadData(
           // Follow Hive's behavior:
           // If no schema or authority is provided with non-local inpath,
           // we will use hadoop configuration "fs.default.name".
-          val defaultFSConf = sqlContext.sessionState.hadoopConf.get("fs.default.name")
+          val defaultFSConf = sparkSession.sessionState.hadoopConf.get("fs.default.name")
           val defaultFS = if (defaultFSConf == null) {
             new URI("")
           } else {
@@ -285,9 +285,9 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean)
       new MetadataBuilder().putString("comment", "comment of the column").build())()
   )
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     val result = new ArrayBuffer[Row]
-    sqlContext.sessionState.catalog.lookupRelation(table) match {
+    sparkSession.sessionState.catalog.lookupRelation(table) match {
       case catalogRelation: CatalogRelation =>
         catalogRelation.catalogTable.schema.foreach { column =>
           result += Row(column.name, column.dataType, column.comment.orNull)
@@ -333,10 +333,10 @@ case class ShowTablesCommand(
       AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil
   }
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     // Since we need to return a Seq of rows, we will call getTables directly
-    // instead of calling tables in sqlContext.
-    val catalog = sqlContext.sessionState.catalog
+    // instead of calling tables in sparkSession.
+    val catalog = sparkSession.sessionState.catalog
     val db = databaseName.getOrElse(catalog.getCurrentDatabase)
     val tables =
       tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db))
@@ -368,13 +368,13 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio
     }
   }
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
 
     if (catalog.isTemporaryTable(table)) {
       Seq.empty[Row]
     } else {
-      val catalogTable = sqlContext.sessionState.catalog.getTableMetadata(table)
+      val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(table)
 
       propertyKey match {
         case Some(p) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 07cc4a9..f42b56f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.SQLBuilder
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
@@ -62,14 +62,14 @@ case class CreateViewCommand(
       "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
   }
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     // If the plan cannot be analyzed, throw an exception and don't proceed.
-    val qe = sqlContext.executePlan(child)
+    val qe = sparkSession.executePlan(child)
     qe.assertAnalyzed()
     val analyzedPlan = qe.analyzed
 
     require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length)
-    val sessionState = sqlContext.sessionState
+    val sessionState = sparkSession.sessionState
 
     if (sessionState.catalog.tableExists(tableIdentifier)) {
       if (allowExisting) {
@@ -77,7 +77,7 @@ case class CreateViewCommand(
         // already exists.
       } else if (replace) {
         // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
-        sessionState.catalog.alterTable(prepareTable(sqlContext, analyzedPlan))
+        sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
       } else {
         // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
         // exists.
@@ -88,7 +88,7 @@ case class CreateViewCommand(
     } else {
       // Create the view if it doesn't exist.
       sessionState.catalog.createTable(
-        prepareTable(sqlContext, analyzedPlan), ignoreIfExists = false)
+        prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
     }
 
     Seq.empty[Row]
@@ -98,9 +98,9 @@ case class CreateViewCommand(
    * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
    * SQL based on the analyzed plan, and also creates the proper schema for the view.
    */
-  private def prepareTable(sqlContext: SQLContext, analyzedPlan: LogicalPlan): CatalogTable = {
+  private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
     val viewSQL: String =
-      if (sqlContext.conf.canonicalView) {
+      if (sparkSession.sessionState.conf.canonicalView) {
         val logicalPlan =
           if (tableDesc.schema.isEmpty) {
             analyzedPlan
@@ -108,7 +108,7 @@ case class CreateViewCommand(
             val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
               case (attr, col) => Alias(attr, col.name)()
             }
-            sqlContext.executePlan(Project(projectList, analyzedPlan)).analyzed
+            sparkSession.executePlan(Project(projectList, analyzedPlan)).analyzed
           }
         new SQLBuilder(logicalPlan).toSQL
       } else {
@@ -134,7 +134,7 @@ case class CreateViewCommand(
     // Validate the view SQL - make sure we can parse it and analyze it.
     // If we cannot analyze the generated query, there is probably a bug in SQL generation.
     try {
-      sqlContext.sql(viewSQL).queryExecution.assertAnalyzed()
+      sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
     } catch {
       case NonFatal(e) =>
         throw new RuntimeException(

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 4e7214c..ef626ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -59,7 +59,7 @@ import org.apache.spark.util.Utils
  * @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
  */
 case class DataSource(
-    sqlContext: SQLContext,
+    sparkSession: SparkSession,
     className: String,
     paths: Seq[String] = Nil,
     userSpecifiedSchema: Option[StructType] = None,
@@ -131,15 +131,15 @@ case class DataSource(
     val allPaths = caseInsensitiveOptions.get("path")
     val globbedPaths = allPaths.toSeq.flatMap { path =>
       val hdfsPath = new Path(path)
-      val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
+      val fs = hdfsPath.getFileSystem(sparkSession.sessionState.hadoopConf)
       val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
       SparkHadoopUtil.get.globPathIfNecessary(qualified)
     }.toArray
 
-    val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None)
+    val fileCatalog: FileCatalog = new HDFSFileCatalog(sparkSession, options, globbedPaths, None)
     userSpecifiedSchema.orElse {
       format.inferSchema(
-        sqlContext,
+        sparkSession,
         caseInsensitiveOptions,
         fileCatalog.allFiles())
     }.getOrElse {
@@ -151,7 +151,8 @@ case class DataSource(
   private def sourceSchema(): SourceInfo = {
     providingClass.newInstance() match {
       case s: StreamSourceProvider =>
-        val (name, schema) = s.sourceSchema(sqlContext, userSpecifiedSchema, className, options)
+        val (name, schema) = s.sourceSchema(
+          sparkSession.wrapped, userSpecifiedSchema, className, options)
         SourceInfo(name, schema)
 
       case format: FileFormat =>
@@ -171,7 +172,7 @@ case class DataSource(
   def createSource(metadataPath: String): Source = {
     providingClass.newInstance() match {
       case s: StreamSourceProvider =>
-        s.createSource(sqlContext, metadataPath, userSpecifiedSchema, className, options)
+        s.createSource(sparkSession.wrapped, metadataPath, userSpecifiedSchema, className, options)
 
       case format: FileFormat =>
         val caseInsensitiveOptions = new CaseInsensitiveMap(options)
@@ -183,16 +184,16 @@ case class DataSource(
           val newOptions = options.filterKeys(_ != "path") + ("basePath" -> path)
           val newDataSource =
             DataSource(
-              sqlContext,
+              sparkSession,
               paths = files,
               userSpecifiedSchema = Some(sourceInfo.schema),
               className = className,
               options = new CaseInsensitiveMap(newOptions))
-          Dataset.ofRows(sqlContext, LogicalRelation(newDataSource.resolveRelation()))
+          Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
         }
 
         new FileStreamSource(
-          sqlContext, metadataPath, path, sourceInfo.schema, dataFrameBuilder)
+          sparkSession, metadataPath, path, sourceInfo.schema, dataFrameBuilder)
       case _ =>
         throw new UnsupportedOperationException(
           s"Data source $className does not support streamed reading")
@@ -202,14 +203,14 @@ case class DataSource(
   /** Returns a sink that can be used to continually write data. */
   def createSink(): Sink = {
     providingClass.newInstance() match {
-      case s: StreamSinkProvider => s.createSink(sqlContext, options, partitionColumns)
+      case s: StreamSinkProvider => s.createSink(sparkSession.wrapped, options, partitionColumns)
       case format: FileFormat =>
         val caseInsensitiveOptions = new CaseInsensitiveMap(options)
         val path = caseInsensitiveOptions.getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
         })
 
-        new FileStreamSink(sqlContext, path, format)
+        new FileStreamSink(sparkSession, path, format)
       case _ =>
         throw new UnsupportedOperationException(
           s"Data source $className does not support streamed writing")
@@ -225,7 +226,7 @@ case class DataSource(
       case Seq(singlePath) =>
         try {
           val hdfsPath = new Path(singlePath)
-          val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
+          val fs = hdfsPath.getFileSystem(sparkSession.sessionState.hadoopConf)
           val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
           val res = fs.exists(metadataPath)
           res
@@ -244,9 +245,9 @@ case class DataSource(
     val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
       // TODO: Throw when too much is given.
       case (dataSource: SchemaRelationProvider, Some(schema)) =>
-        dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema)
+        dataSource.createRelation(sparkSession.wrapped, caseInsensitiveOptions, schema)
       case (dataSource: RelationProvider, None) =>
-        dataSource.createRelation(sqlContext, caseInsensitiveOptions)
+        dataSource.createRelation(sparkSession.wrapped, caseInsensitiveOptions)
       case (_: SchemaRelationProvider, None) =>
         throw new AnalysisException(s"A schema needs to be specified when using $className.")
       case (_: RelationProvider, Some(_)) =>
@@ -257,11 +258,10 @@ case class DataSource(
       case (format: FileFormat, _)
           if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
         val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
-        val fileCatalog =
-          new StreamFileCatalog(sqlContext, basePath)
+        val fileCatalog = new StreamFileCatalog(sparkSession, basePath)
         val dataSchema = userSpecifiedSchema.orElse {
           format.inferSchema(
-            sqlContext,
+            sparkSession,
             caseInsensitiveOptions,
             fileCatalog.allFiles())
         }.getOrElse {
@@ -271,7 +271,7 @@ case class DataSource(
         }
 
         HadoopFsRelation(
-          sqlContext,
+          sparkSession,
           fileCatalog,
           partitionSchema = fileCatalog.partitionSpec().partitionColumns,
           dataSchema = dataSchema,
@@ -284,7 +284,7 @@ case class DataSource(
         val allPaths = caseInsensitiveOptions.get("path") ++ paths
         val globbedPaths = allPaths.flatMap { path =>
           val hdfsPath = new Path(path)
-          val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
+          val fs = hdfsPath.getFileSystem(sparkSession.sessionState.hadoopConf)
           val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
           val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
 
@@ -311,11 +311,11 @@ case class DataSource(
         }
 
         val fileCatalog: FileCatalog =
-          new HDFSFileCatalog(sqlContext, options, globbedPaths, partitionSchema)
+          new HDFSFileCatalog(sparkSession, options, globbedPaths, partitionSchema)
 
         val dataSchema = userSpecifiedSchema.map { schema =>
           val equality =
-            if (sqlContext.conf.caseSensitiveAnalysis) {
+            if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
               org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
             } else {
               org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
@@ -324,7 +324,7 @@ case class DataSource(
           StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
         }.orElse {
           format.inferSchema(
-            sqlContext,
+            sparkSession,
             caseInsensitiveOptions,
             fileCatalog.allFiles())
         }.getOrElse {
@@ -334,10 +334,10 @@ case class DataSource(
         }
 
         val enrichedOptions =
-          format.prepareRead(sqlContext, caseInsensitiveOptions, fileCatalog.allFiles())
+          format.prepareRead(sparkSession, caseInsensitiveOptions, fileCatalog.allFiles())
 
         HadoopFsRelation(
-          sqlContext,
+          sparkSession,
           fileCatalog,
           partitionSchema = fileCatalog.partitionSpec().partitionColumns,
           dataSchema = dataSchema.asNullable,
@@ -363,7 +363,7 @@ case class DataSource(
 
     providingClass.newInstance() match {
       case dataSource: CreatableRelationProvider =>
-        dataSource.createRelation(sqlContext, mode, options, data)
+        dataSource.createRelation(sparkSession.wrapped, mode, options, data)
       case format: FileFormat =>
         // Don't glob path for the write path.  The contracts here are:
         //  1. Only one output path can be specified on the write path;
@@ -374,11 +374,11 @@ case class DataSource(
           val path = new Path(caseInsensitiveOptions.getOrElse("path", {
             throw new IllegalArgumentException("'path' is not specified")
           }))
-          val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
+          val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf)
           path.makeQualified(fs.getUri, fs.getWorkingDirectory)
         }
 
-        val caseSensitive = sqlContext.conf.caseSensitiveAnalysis
+        val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
         PartitioningUtils.validatePartitionColumnDataTypes(
           data.schema, partitionColumns, caseSensitive)
 
@@ -421,7 +421,7 @@ case class DataSource(
             options,
             data.logicalPlan,
             mode)
-        sqlContext.executePlan(plan).toRdd
+        sparkSession.executePlan(plan).toRdd
 
       case _ =>
         sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 60238bd..f7f68b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable
 import org.apache.spark.{Partition => RDDPartition, TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{InputFileNameHolder, RDD}
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.vectorized.ColumnarBatch
 
@@ -51,10 +51,10 @@ case class PartitionedFile(
 case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends RDDPartition
 
 class FileScanRDD(
-    @transient val sqlContext: SQLContext,
+    @transient private val sparkSession: SparkSession,
     readFunction: (PartitionedFile) => Iterator[InternalRow],
     @transient val filePartitions: Seq[FilePartition])
-  extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
+  extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
 
   override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = {
     val iterator = new Iterator[Object] with AutoCloseable {

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 751daa0..9e1308b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -74,14 +74,14 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
       }
 
       val partitionColumns =
-        l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)
+        l.resolve(files.partitionSchema, files.sparkSession.sessionState.analyzer.resolver)
       val partitionSet = AttributeSet(partitionColumns)
       val partitionKeyFilters =
         ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
       logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
 
       val dataColumns =
-        l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)
+        l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver)
 
       // Partition keys are not available in the statistics of the files.
       val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
@@ -107,7 +107,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
       logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
       val readFile = files.fileFormat.buildReader(
-        sqlContext = files.sqlContext,
+        sparkSession = files.sparkSession,
         dataSchema = files.dataSchema,
         partitionSchema = files.partitionSchema,
         requiredSchema = prunedDataSchema,
@@ -115,7 +115,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
         options = files.options)
 
       val plannedPartitions = files.bucketSpec match {
-        case Some(bucketing) if files.sqlContext.conf.bucketingEnabled =>
+        case Some(bucketing) if files.sparkSession.sessionState.conf.bucketingEnabled =>
           logInfo(s"Planning with ${bucketing.numBuckets} buckets")
           val bucketed =
             selectedPartitions.flatMap { p =>
@@ -134,9 +134,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
           }
 
         case _ =>
-          val defaultMaxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
-          val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes
-          val defaultParallelism = files.sqlContext.sparkContext.defaultParallelism
+          val defaultMaxSplitBytes = files.sparkSession.sessionState.conf.filesMaxPartitionBytes
+          val openCostInBytes = files.sparkSession.sessionState.conf.filesOpenCostInBytes
+          val defaultParallelism = files.sparkSession.sparkContext.defaultParallelism
           val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
           val bytesPerCore = totalBytes / defaultParallelism
           val maxSplitBytes = Math.min(defaultMaxSplitBytes,
@@ -195,7 +195,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
         DataSourceScanExec.create(
           readDataColumns ++ partitionColumns,
           new FileScanRDD(
-            files.sqlContext,
+            files.sparkSession,
             readFile,
             plannedPartitions),
           files,

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
index 37c2c45..7b15e49 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
@@ -32,15 +32,15 @@ private[sql] case class InsertIntoDataSource(
     overwrite: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
-    val data = Dataset.ofRows(sqlContext, query)
+    val data = Dataset.ofRows(sparkSession, query)
     // Apply the schema of the existing table to the new data.
-    val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
+    val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
     relation.insert(df, overwrite)
 
     // Invalidate the cache.
-    sqlContext.cacheManager.invalidateCache(logicalRelation)
+    sparkSession.cacheManager.invalidateCache(logicalRelation)
 
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index a636ca2..b2483e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -68,7 +68,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
 
   override def children: Seq[LogicalPlan] = query :: Nil
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     // Most formats don't do well with duplicate columns, so lets not allow that
     if (query.schema.fieldNames.length != query.schema.fieldNames.distinct.length) {
       val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect {
@@ -78,7 +78,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
           s"cannot save to file.")
     }
 
-    val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf)
+    val hadoopConf = new Configuration(sparkSession.sessionState.hadoopConf)
     val fs = outputPath.getFileSystem(hadoopConf)
     val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
 
@@ -111,14 +111,14 @@ private[sql] case class InsertIntoHadoopFsRelation(
       val partitionSet = AttributeSet(partitionColumns)
       val dataColumns = query.output.filterNot(partitionSet.contains)
 
-      val queryExecution = Dataset.ofRows(sqlContext, query).queryExecution
-      SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
+      val queryExecution = Dataset.ofRows(sparkSession, query).queryExecution
+      SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
         val relation =
           WriteRelation(
-            sqlContext,
+            sparkSession,
             dataColumns.toStructType,
             qualifiedOutputPath.toString,
-            fileFormat.prepareWrite(sqlContext, _, options, dataColumns.toStructType),
+            fileFormat.prepareWrite(sparkSession, _, options, dataColumns.toStructType),
             bucketSpec)
 
         val writerContainer = if (partitionColumns.isEmpty && bucketSpec.isEmpty) {
@@ -131,7 +131,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
             dataColumns = dataColumns,
             inputSchema = query.output,
             PartitioningUtils.DEFAULT_PARTITION_NAME,
-            sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES),
+            sparkSession.getConf(SQLConf.PARTITION_MAX_FILES),
             isAppend)
         }
 
@@ -140,7 +140,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
         writerContainer.driverSideSetup()
 
         try {
-          sqlContext.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _)
+          sparkSession.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _)
           writerContainer.commitJob()
           refreshFunction()
         } catch { case cause: Throwable =>

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index b9527db..3b064a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.InternalRow
@@ -36,9 +36,10 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
+
 /** A container for all the details required when writing to a table. */
 case class WriteRelation(
-    sqlContext: SQLContext,
+    sparkSession: SparkSession,
     dataSchema: StructType,
     path: String,
     prepareJobForWrite: Job => OutputWriterFactory,
@@ -66,7 +67,7 @@ private[sql] abstract class BaseWriterContainer(
   @transient private val jobContext: JobContext = job
 
   private val speculationEnabled: Boolean =
-    relation.sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
+    relation.sparkSession.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
 
   // The following fields are initialized and used on both driver and executor side.
   @transient protected var outputCommitter: OutputCommitter = _

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 7d407a7..fb047ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce._
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
@@ -49,14 +49,14 @@ class DefaultSource extends FileFormat with DataSourceRegister {
   override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
 
   override def inferSchema(
-      sqlContext: SQLContext,
+      sparkSession: SparkSession,
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
     val csvOptions = new CSVOptions(options)
 
     // TODO: Move filtering.
     val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString)
-    val rdd = baseRdd(sqlContext, csvOptions, paths)
+    val rdd = baseRdd(sparkSession, csvOptions, paths)
     val firstLine = findFirstLine(csvOptions, rdd)
     val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine)
 
@@ -66,7 +66,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
       firstRow.zipWithIndex.map { case (value, index) => s"C$index" }
     }
 
-    val parsedRdd = tokenRdd(sqlContext, csvOptions, header, paths)
+    val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths)
     val schema = if (csvOptions.inferSchemaFlag) {
       CSVInferSchema.infer(parsedRdd, header, csvOptions.nullValue)
     } else {
@@ -80,7 +80,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
   }
 
   override def prepareWrite(
-      sqlContext: SQLContext,
+      sparkSession: SparkSession,
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
@@ -94,7 +94,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
   }
 
   override def buildReader(
-      sqlContext: SQLContext,
+      sparkSession: SparkSession,
       dataSchema: StructType,
       partitionSchema: StructType,
       requiredSchema: StructType,
@@ -103,8 +103,8 @@ class DefaultSource extends FileFormat with DataSourceRegister {
     val csvOptions = new CSVOptions(options)
     val headers = requiredSchema.fields.map(_.name)
 
-    val conf = new Configuration(sqlContext.sessionState.hadoopConf)
-    val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+    val conf = new Configuration(sparkSession.sessionState.hadoopConf)
+    val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
 
     (file: PartitionedFile) => {
       val lineIterator = {
@@ -134,18 +134,18 @@ class DefaultSource extends FileFormat with DataSourceRegister {
   }
 
   private def baseRdd(
-      sqlContext: SQLContext,
+      sparkSession: SparkSession,
       options: CSVOptions,
       inputPaths: Seq[String]): RDD[String] = {
-    readText(sqlContext, options, inputPaths.mkString(","))
+    readText(sparkSession, options, inputPaths.mkString(","))
   }
 
   private def tokenRdd(
-      sqlContext: SQLContext,
+      sparkSession: SparkSession,
       options: CSVOptions,
       header: Array[String],
       inputPaths: Seq[String]): RDD[Array[String]] = {
-    val rdd = baseRdd(sqlContext, options, inputPaths)
+    val rdd = baseRdd(sparkSession, options, inputPaths)
     // Make sure firstLine is materialized before sending to executors
     val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else null
     CSVRelation.univocityTokenizer(rdd, header, firstLine, options)
@@ -168,14 +168,14 @@ class DefaultSource extends FileFormat with DataSourceRegister {
   }
 
   private def readText(
-      sqlContext: SQLContext,
+      sparkSession: SparkSession,
       options: CSVOptions,
       location: String): RDD[String] = {
     if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
-      sqlContext.sparkContext.textFile(location)
+      sparkSession.sparkContext.textFile(location)
     } else {
       val charset = options.charset
-      sqlContext.sparkContext
+      sparkSession.sparkContext
         .hadoopFile[LongWritable, Text, TextInputFormat](location)
         .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index e7e94bb..7d0a3d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -74,15 +74,15 @@ case class CreateTempTableUsing(
       s"Temporary table '$tableIdent' should not have specified a database")
   }
 
-  def run(sqlContext: SQLContext): Seq[Row] = {
+  def run(sparkSession: SparkSession): Seq[Row] = {
     val dataSource = DataSource(
-      sqlContext,
+      sparkSession,
       userSpecifiedSchema = userSpecifiedSchema,
       className = provider,
       options = options)
-    sqlContext.sessionState.catalog.createTempTable(
+    sparkSession.sessionState.catalog.createTempTable(
       tableIdent.table,
-      Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
+      Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
       overrideIfExists = true)
 
     Seq.empty[Row]
@@ -102,18 +102,18 @@ case class CreateTempTableUsingAsSelect(
       s"Temporary table '$tableIdent' should not have specified a database")
   }
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val df = Dataset.ofRows(sqlContext, query)
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val df = Dataset.ofRows(sparkSession, query)
     val dataSource = DataSource(
-      sqlContext,
+      sparkSession,
       className = provider,
       partitionColumns = partitionColumns,
       bucketSpec = None,
       options = options)
     val result = dataSource.write(mode, df)
-    sqlContext.sessionState.catalog.createTempTable(
+    sparkSession.sessionState.catalog.createTempTable(
       tableIdent.table,
-      Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan,
+      Dataset.ofRows(sparkSession, LogicalRelation(result)).logicalPlan,
       overrideIfExists = true)
 
     Seq.empty[Row]
@@ -123,23 +123,23 @@ case class CreateTempTableUsingAsSelect(
 case class RefreshTable(tableIdent: TableIdentifier)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     // Refresh the given table's metadata first.
-    sqlContext.sessionState.catalog.refreshTable(tableIdent)
+    sparkSession.sessionState.catalog.refreshTable(tableIdent)
 
     // If this table is cached as a InMemoryColumnarRelation, drop the original
     // cached version and make the new version cached lazily.
-    val logicalPlan = sqlContext.sessionState.catalog.lookupRelation(tableIdent)
+    val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
     // Use lookupCachedData directly since RefreshTable also takes databaseName.
-    val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+    val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
     if (isCached) {
       // Create a data frame to represent the table.
       // TODO: Use uncacheTable once it supports database name.
-      val df = Dataset.ofRows(sqlContext, logicalPlan)
+      val df = Dataset.ofRows(sparkSession, logicalPlan)
       // Uncache the logicalPlan.
-      sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
+      sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
       // Cache it again.
-      sqlContext.cacheManager.cacheQuery(df, Some(tableIdent.table))
+      sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
     }
 
     Seq.empty[Row]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org