[1/3] spark git commit: [SPARK-8906][SQL] Move all internal data source classes into execution.datasources.

Repository: spark
Updated Branches:
  refs/heads/master 9ba7c64de -> 60c0ce134
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
deleted file mode 100644
index 5c6ef2d..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ /dev/null
@@ -1,581 +0,0 @@
-package org.apache.spark.sql.sources
-import java.util.{Date, UUID}
-import scala.collection.JavaConversions.asScalaIterator
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
-import org.apache.spark._
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.types.StringType
-import org.apache.spark.util.SerializableConfiguration
-private[sql] case class InsertIntoDataSource(
-    logicalRelation: LogicalRelation,
-    query: LogicalPlan,
-    overwrite: Boolean)
-  extends RunnableCommand {
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
-    val data = DataFrame(sqlContext, query)
-    // Apply the schema of the existing table to the new data.
-    val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
-    relation.insert(df, overwrite)
-    // Invalidate the cache.
-    sqlContext.cacheManager.invalidateCache(logicalRelation)
-    Seq.empty[Row]
-  }
- * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
- * Writing to dynamic partitions is also supported.  Each [[InsertIntoHadoopFsRelation]] issues a
- * single write job, and owns a UUID that identifies this job.  Each concrete implementation of
- * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
- * each task output file.  This UUID is passed to executor side via a property named
- * `spark.sql.sources.writeJobUUID`.
- *
- * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
- * are used to write to normal tables and tables with dynamic partitions.
- *
- * Basic work flow of this command is:
- *
- *   1. Driver side setup, including output committer initialization and data source specific
- *      preparation work for the write job to be issued.
- *   2. Issues a write job consists of one or more executor side tasks, each of which writes all
- *      rows within an RDD partition.
- *   3. If no exception is thrown in a task, commits that task, otherwise aborts that task;  If any
- *      exception is thrown during task commitment, also aborts that task.
- *   4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception is
- *      thrown during job commitment, also aborts the job.
- */
-private[sql] case class InsertIntoHadoopFsRelation(
-    @transient relation: HadoopFsRelation,
-    @transient query: LogicalPlan,
-    mode: SaveMode)
-  extends RunnableCommand {
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    require(
-      relation.paths.length == 1,
-      s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
-    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
-    val outputPath = new Path(relation.paths.head)
-    val fs = outputPath.getFileSystem(hadoopConf)
-    val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-    val pathExists = fs.exists(qualifiedOutputPath)
-    val doInsertion = (mode, pathExists) match {
-      case (SaveMode.ErrorIfExists, true) =>
-        sys.error(s"path $qualifiedOutputPath already exists.")
-      case (SaveMode.Overwrite, true) =>
-        fs.delete(qualifiedOutputPath, true)
-        true
-      case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
-        true
-      case (SaveMode.Ignore, exists) =>
-        !exists
-    }
-    // If we are appending data to an existing dir.
-    val isAppend = pathExists && (mode == SaveMode.Append)
-    if (doInsertion) {
-      val job = new Job(hadoopConf)
-      job.setOutputKeyClass(classOf[Void])
-      job.setOutputValueClass(classOf[InternalRow])
-      FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
-      // We create a DataFrame by applying the schema of relation to the data to make sure.
-      // We are writing data based on the expected schema,
-      val df = {
-        // For partitioned relation r, r.schema's column ordering can be different from the column
-        // ordering of data.logicalPlan (partition columns are all moved after data column). We
-        // need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
-        // safely apply the schema of r.schema to the data.
-        val project = Project(
- => new UnresolvedAttribute(Seq(, query)
-        sqlContext.internalCreateDataFrame(
-          DataFrame(sqlContext, project).queryExecution.toRdd, relation.schema)
-      }
-      val partitionColumns = relation.partitionColumns.fieldNames
-      if (partitionColumns.isEmpty) {
-        insert(new DefaultWriterContainer(relation, job, isAppend), df)
-      } else {
-        val writerContainer = new DynamicPartitionWriterContainer(
-          relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
-        insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
-      }
-    }
-    Seq.empty[Row]
-  }
-  /**
-   * Inserts the content of the [[DataFrame]] into a table without any partitioning columns.
-   */
-  private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = {
-    // Uses local vals for serialization
-    val needsConversion = relation.needConversion
-    val dataSchema = relation.dataSchema
-    // This call shouldn't be put into the `try` block below because it only initializes and
-    // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
-    writerContainer.driverSideSetup()
-    try {
-      df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
-      writerContainer.commitJob()
-      relation.refresh()
-    } catch { case cause: Throwable =>
-      logError("Aborting job.", cause)
-      writerContainer.abortJob()
-      throw new SparkException("Job aborted.", cause)
-    }
-    def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
-      // If anything below fails, we should abort the task.
-      try {
-        writerContainer.executorSideSetup(taskContext)
-        val converter: InternalRow => Row = if (needsConversion) {
-          CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
-        } else {
-          r: InternalRow => r.asInstanceOf[Row]
-        }
-        while (iterator.hasNext) {
-          val internalRow =
-          writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
-        }
-        writerContainer.commitTask()
-      } catch { case cause: Throwable =>
-        logError("Aborting task.", cause)
-        writerContainer.abortTask()
-        throw new SparkException("Task failed while writing rows.", cause)
-      }
-    }
-  }
-  /**
-   * Inserts the content of the [[DataFrame]] into a table with partitioning columns.
-   */
-  private def insertWithDynamicPartitions(
-      sqlContext: SQLContext,
-      writerContainer: BaseWriterContainer,
-      df: DataFrame,
-      partitionColumns: Array[String]): Unit = {
-    // Uses a local val for serialization
-    val needsConversion = relation.needConversion
-    val dataSchema = relation.dataSchema
-    require(
-      df.schema == relation.schema,
-      s"""DataFrame must have the same schema as the relation to which is inserted.
-         |DataFrame schema: ${df.schema}
-         |Relation schema: ${relation.schema}
-       """.stripMargin)
-    val partitionColumnsInSpec = relation.partitionColumns.fieldNames
-    require(
-      partitionColumnsInSpec.sameElements(partitionColumns),
-      s"""Partition columns mismatch.
-         |Expected: ${partitionColumnsInSpec.mkString(", ")}
-         |Actual: ${partitionColumns.mkString(", ")}
-       """.stripMargin)
-    val output = df.queryExecution.executedPlan.output
-    val (partitionOutput, dataOutput) = output.partition(a => partitionColumns.contains(
-    val codegenEnabled = df.sqlContext.conf.codegenEnabled
-    // This call shouldn't be put into the `try` block below because it only initializes and
-    // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
-    writerContainer.driverSideSetup()
-    try {
-      df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
-      writerContainer.commitJob()
-      relation.refresh()
-    } catch { case cause: Throwable =>
-      logError("Aborting job.", cause)
-      writerContainer.abortJob()
-      throw new SparkException("Job aborted.", cause)
-    }
-    def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
-      // If anything below fails, we should abort the task.
-      try {
-        writerContainer.executorSideSetup(taskContext)
-        // Projects all partition columns and casts them to strings to build partition directories.
-        val partitionCasts =, StringType))
-        val partitionProj = newProjection(codegenEnabled, partitionCasts, output)
-        val dataProj = newProjection(codegenEnabled, dataOutput, output)
-        val dataConverter: InternalRow => Row = if (needsConversion) {
-          CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
-        } else {
-          r: InternalRow => r.asInstanceOf[Row]
-        }
-        while (iterator.hasNext) {
-          val internalRow =
-          val partitionPart = partitionProj(internalRow)
-          val dataPart = dataConverter(dataProj(internalRow))
-          writerContainer.outputWriterForRow(partitionPart).write(dataPart)
-        }
-        writerContainer.commitTask()
-      } catch { case cause: Throwable =>
-        logError("Aborting task.", cause)
-        writerContainer.abortTask()
-        throw new SparkException("Task failed while writing rows.", cause)
-      }
-    }
-  }
-  // This is copied from SparkPlan, probably should move this to a more general place.
-  private def newProjection(
-      codegenEnabled: Boolean,
-      expressions: Seq[Expression],
-      inputSchema: Seq[Attribute]): Projection = {
-    log.debug(
-      s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
-    if (codegenEnabled) {
-      try {
-        GenerateProjection.generate(expressions, inputSchema)
-      } catch {
-        case e: Exception =>
-          if (sys.props.contains("spark.testing")) {
-            throw e
-          } else {
-            log.error("failed to generate projection, fallback to interpreted", e)
-            new InterpretedProjection(expressions, inputSchema)
-          }
-      }
-    } else {
-      new InterpretedProjection(expressions, inputSchema)
-    }
-  }
-private[sql] abstract class BaseWriterContainer(
-    @transient val relation: HadoopFsRelation,
-    @transient job: Job,
-    isAppend: Boolean)
-  extends SparkHadoopMapReduceUtil
-  with Logging
-  with Serializable {
-  protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
-  // This UUID is used to avoid output file name collision between different appending write jobs.
-  // These jobs may belong to different SparkContext instances. Concrete data source implementations
-  // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
-  //  The reason why this ID is used to identify a job rather than a single task output file is
-  // that, speculative tasks must generate the same output file name as the original task.
-  private val uniqueWriteJobId = UUID.randomUUID()
-  // This is only used on driver side.
-  @transient private val jobContext: JobContext = job
-  // The following fields are initialized and used on both driver and executor side.
-  @transient protected var outputCommitter: OutputCommitter = _
-  @transient private var jobId: JobID = _
-  @transient private var taskId: TaskID = _
-  @transient private var taskAttemptId: TaskAttemptID = _
-  @transient protected var taskAttemptContext: TaskAttemptContext = _
-  protected val outputPath: String = {
-    assert(
-      relation.paths.length == 1,
-      s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
-    relation.paths.head
-  }
-  protected val dataSchema = relation.dataSchema
-  protected var outputWriterFactory: OutputWriterFactory = _
-  private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
-  def driverSideSetup(): Unit = {
-    setupIDs(0, 0, 0)
-    setupConf()
-    // This UUID is sent to executor side together with the serialized `Configuration` object within
-    // the `Job` instance.  `OutputWriters` on the executor side should use this UUID to generate
-    // unique task output files.
-    job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
-    // Order of the following two lines is important.  For Hadoop 1, TaskAttemptContext constructor
-    // clones the Configuration object passed in.  If we initialize the TaskAttemptContext first,
-    // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
-    //
-    // Also, the `prepareJobForWrite` call must happen before initializing output format and output
-    // committer, since their initialization involve the job configuration, which can be potentially
-    // decorated in `prepareJobForWrite`.
-    outputWriterFactory = relation.prepareJobForWrite(job)
-    taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
-    outputFormatClass = job.getOutputFormatClass
-    outputCommitter = newOutputCommitter(taskAttemptContext)
-    outputCommitter.setupJob(jobContext)
-  }
-  def executorSideSetup(taskContext: TaskContext): Unit = {
-    setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
-    setupConf()
-    taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
-    outputCommitter = newOutputCommitter(taskAttemptContext)
-    outputCommitter.setupTask(taskAttemptContext)
-    initWriters()
-  }
-  protected def getWorkPath: String = {
-    outputCommitter match {
-      // FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
-      case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
-      case _ => outputPath
-    }
-  }
-  private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
-    val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
-    if (isAppend) {
-      // If we are appending data to an existing dir, we will only use the output committer
-      // associated with the file output format since it is not safe to use a custom
-      // committer for appending. For example, in S3, direct parquet output committer may
-      // leave partial data in the destination dir when the the appending job fails.
-      logInfo(
-        s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
-        "for appending.")
-      defaultOutputCommitter
-    } else {
-      val committerClass = context.getConfiguration.getClass(
-        SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-      Option(committerClass).map { clazz =>
-        logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
-        // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
-        // has an associated output committer. To override this output committer,
-        // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
-        // If a data source needs to override the output committer, it needs to set the
-        // output committer in prepareForWrite method.
-        if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
-          // The specified output committer is a FileOutputCommitter.
-          // So, we will use the FileOutputCommitter-specified constructor.
-          val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
-          ctor.newInstance(new Path(outputPath), context)
-        } else {
-          // The specified output committer is just a OutputCommitter.
-          // So, we will use the no-argument constructor.
-          val ctor = clazz.getDeclaredConstructor()
-          ctor.newInstance()
-        }
-      }.getOrElse {
-        // If output committer class is not set, we will use the one associated with the
-        // file output format.
-        logInfo(
-          s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
-        defaultOutputCommitter
-      }
-    }
-  }
-  private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
-    this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
-    this.taskId = new TaskID(this.jobId, true, splitId)
-    this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
-  }
-  private def setupConf(): Unit = {
-    serializableConf.value.set("", jobId.toString)
-    serializableConf.value.set("", taskAttemptId.getTaskID.toString)
-    serializableConf.value.set("", taskAttemptId.toString)
-    serializableConf.value.setBoolean("", true)
-    serializableConf.value.setInt("mapred.task.partition", 0)
-  }
-  // Called on executor side when writing rows
-  def outputWriterForRow(row: InternalRow): OutputWriter
-  protected def initWriters(): Unit
-  def commitTask(): Unit = {
-    SparkHadoopMapRedUtil.commitTask(
-      outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
-  }
-  def abortTask(): Unit = {
-    if (outputCommitter != null) {
-      outputCommitter.abortTask(taskAttemptContext)
-    }
-    logError(s"Task attempt $taskAttemptId aborted.")
-  }
-  def commitJob(): Unit = {
-    outputCommitter.commitJob(jobContext)
-    logInfo(s"Job $jobId committed.")
-  }
-  def abortJob(): Unit = {
-    if (outputCommitter != null) {
-      outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
-    }
-    logError(s"Job $jobId aborted.")
-  }
-private[sql] class DefaultWriterContainer(
-    @transient relation: HadoopFsRelation,
-    @transient job: Job,
-    isAppend: Boolean)
-  extends BaseWriterContainer(relation, job, isAppend) {
-  @transient private var writer: OutputWriter = _
-  override protected def initWriters(): Unit = {
-    taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
-    writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
-  }
-  override def outputWriterForRow(row: InternalRow): OutputWriter = writer
-  override def commitTask(): Unit = {
-    try {
-      assert(writer != null, "OutputWriter instance should have been initialized")
-      writer.close()
-      super.commitTask()
-    } catch { case cause: Throwable =>
-      // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
-      // cause `abortTask()` to be invoked.
-      throw new RuntimeException("Failed to commit task", cause)
-    }
-  }
-  override def abortTask(): Unit = {
-    try {
-      // It's possible that the task fails before `writer` gets initialized
-      if (writer != null) {
-        writer.close()
-      }
-    } finally {
-      super.abortTask()
-    }
-  }
-private[sql] class DynamicPartitionWriterContainer(
-    @transient relation: HadoopFsRelation,
-    @transient job: Job,
-    partitionColumns: Array[String],
-    defaultPartitionName: String,
-    isAppend: Boolean)
-  extends BaseWriterContainer(relation, job, isAppend) {
-  // All output writers are created on executor side.
-  @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _
-  override protected def initWriters(): Unit = {
-    outputWriters = new java.util.HashMap[String, OutputWriter]
-  }
-  // The `row` argument is supposed to only contain partition column values which have been casted
-  // to strings.
-  override def outputWriterForRow(row: InternalRow): OutputWriter = {
-    val partitionPath = {
-      val partitionPathBuilder = new StringBuilder
-      var i = 0
-      while (i < partitionColumns.length) {
-        val col = partitionColumns(i)
-        val partitionValueString = {
-          val string = row.getString(i)
-          if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string)
-        }
-        if (i > 0) {
-          partitionPathBuilder.append(Path.SEPARATOR_CHAR)
-        }
-        partitionPathBuilder.append(s"$col=$partitionValueString")
-        i += 1
-      }
-      partitionPathBuilder.toString()
-    }
-    val writer = outputWriters.get(partitionPath)
-    if (writer.eq(null)) {
-      val path = new Path(getWorkPath, partitionPath)
-      taskAttemptContext.getConfiguration.set(
-        "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
-      val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
-      outputWriters.put(partitionPath, newWriter)
-      newWriter
-    } else {
-      writer
-    }
-  }
-  private def clearOutputWriters(): Unit = {
-    if (!outputWriters.isEmpty) {
-      asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
-      outputWriters.clear()
-    }
-  }
-  override def commitTask(): Unit = {
-    try {
-      clearOutputWriters()
-      super.commitTask()
-    } catch { case cause: Throwable =>
-      throw new RuntimeException("Failed to commit task", cause)
-    }
-  }
-  override def abortTask(): Unit = {
-    try {
-      clearOutputWriters()
-    } finally {
-      super.abortTask()
-    }
-  }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
deleted file mode 100644
index 5a8c97c..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ /dev/null
@@ -1,493 +0,0 @@
-package org.apache.spark.sql.sources
-import scala.language.{existentials, implicitConversions}
-import scala.util.matching.Regex
-import org.apache.hadoop.fs.Path
-import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SaveMode}
-import org.apache.spark.util.Utils
- * A parser for foreign DDL commands.
- */
-private[sql] class DDLParser(
-    parseQuery: String => LogicalPlan)
-  extends AbstractSparkSQLParser with DataTypeParser with Logging {
-  def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
-    try {
-      parse(input)
-    } catch {
-      case ddlException: DDLException => throw ddlException
-      case _ if !exceptionOnError => parseQuery(input)
-      case x: Throwable => throw x
-    }
-  }
-  // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
-  // properties via reflection the class in runtime for constructing the SqlLexical object
-  protected val CREATE = Keyword("CREATE")
-  protected val TEMPORARY = Keyword("TEMPORARY")
-  protected val TABLE = Keyword("TABLE")
-  protected val IF = Keyword("IF")
-  protected val NOT = Keyword("NOT")
-  protected val EXISTS = Keyword("EXISTS")
-  protected val USING = Keyword("USING")
-  protected val OPTIONS = Keyword("OPTIONS")
-  protected val DESCRIBE = Keyword("DESCRIBE")
-  protected val EXTENDED = Keyword("EXTENDED")
-  protected val AS = Keyword("AS")
-  protected val COMMENT = Keyword("COMMENT")
-  protected val REFRESH = Keyword("REFRESH")
-  protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable
-  protected def start: Parser[LogicalPlan] = ddl
-  /**
-   * USING org.apache.spark.sql.avro
-   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
-   * or
-   * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
-   * USING org.apache.spark.sql.avro
-   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
-   * or
-   * USING org.apache.spark.sql.avro
-   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
-   * AS SELECT ...
-   */
-  protected lazy val createTable: Parser[LogicalPlan] =
-    // TODO: Support database.table.
-    (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
-      tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
-      case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
-        if (temp.isDefined && allowExisting.isDefined) {
-          throw new DDLException(
-            "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
-        }
-        val options = opts.getOrElse(Map.empty[String, String])
-        if (query.isDefined) {
-          if (columns.isDefined) {
-            throw new DDLException(
-              "a CREATE TABLE AS SELECT statement does not allow column definitions.")
-          }
-          // When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
-          val mode = if (allowExisting.isDefined) {
-            SaveMode.Ignore
-          } else if (temp.isDefined) {
-            SaveMode.Overwrite
-          } else {
-            SaveMode.ErrorIfExists
-          }
-          val queryPlan = parseQuery(query.get)
-          CreateTableUsingAsSelect(tableName,
-            provider,
-            temp.isDefined,
-            Array.empty[String],
-            mode,
-            options,
-            queryPlan)
-        } else {
-          val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
-          CreateTableUsing(
-            tableName,
-            userSpecifiedSchema,
-            provider,
-            temp.isDefined,
-            options,
-            allowExisting.isDefined,
-            managedIfNoPath = false)
-        }
-    }
-  protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
-  /*
-   * describe [extended] table avroTable
-   * This will display all columns of table `avroTable` includes column_name,column_type,comment
-   */
-  protected lazy val describeTable: Parser[LogicalPlan] =
-    (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident  ^^ {
-      case e ~ db ~ tbl =>
-        val tblIdentifier = db match {
-          case Some(dbName) =>
-            Seq(dbName, tbl)
-          case None =>
-            Seq(tbl)
-        }
-        DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
-   }
-  protected lazy val refreshTable: Parser[LogicalPlan] =
-    REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
-      case maybeDatabaseName ~ tableName =>
-        RefreshTable(maybeDatabaseName.getOrElse("default"), tableName)
-    }
-  protected lazy val options: Parser[Map[String, String]] =
-    "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
-  protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}
-  override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
-    s"identifier matching regex $regex", {
-      case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
-      case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
-    }
-  )
-  protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ {
-    case name => name
-  }
-  protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
-    case parts => parts.mkString(".")
-  }
-  protected lazy val pair: Parser[(String, String)] =
-    optionName ~ stringLit ^^ { case k ~ v => (k, v) }
-  protected lazy val column: Parser[StructField] =
-    ident ~ dataType ~ (COMMENT ~> stringLit).?  ^^ { case columnName ~ typ ~ cm =>
-      val meta = cm match {
-        case Some(comment) =>
-          new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
-        case None => Metadata.empty
-      }
-      StructField(columnName, typ, nullable = true, meta)
-    }
-private[sql] object ResolvedDataSource {
-  private val builtinSources = Map(
-    "jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource",
-    "json" -> "org.apache.spark.sql.json.DefaultSource",
-    "parquet" -> "org.apache.spark.sql.parquet.DefaultSource",
-    "orc" -> "org.apache.spark.sql.hive.orc.DefaultSource"
-  )
-  /** Given a provider name, look up the data source class definition. */
-  def lookupDataSource(provider: String): Class[_] = {
-    val loader = Utils.getContextOrSparkClassLoader
-    if (builtinSources.contains(provider)) {
-      return loader.loadClass(builtinSources(provider))
-    }
-    try {
-      loader.loadClass(provider)
-    } catch {
-      case cnf: java.lang.ClassNotFoundException =>
-        try {
-          loader.loadClass(provider + ".DefaultSource")
-        } catch {
-          case cnf: java.lang.ClassNotFoundException =>
-            if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
-              sys.error("The ORC data source must be used with Hive support enabled.")
-            } else {
-              sys.error(s"Failed to load class for data source: $provider")
-            }
-        }
-    }
-  }
-  /** Create a [[ResolvedDataSource]] for reading data in. */
-  def apply(
-      sqlContext: SQLContext,
-      userSpecifiedSchema: Option[StructType],
-      partitionColumns: Array[String],
-      provider: String,
-      options: Map[String, String]): ResolvedDataSource = {
-    val clazz: Class[_] = lookupDataSource(provider)
-    def className: String = clazz.getCanonicalName
-    val relation = userSpecifiedSchema match {
-      case Some(schema: StructType) => clazz.newInstance() match {
-        case dataSource: SchemaRelationProvider =>
-          dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
-        case dataSource: HadoopFsRelationProvider =>
-          val maybePartitionsSchema = if (partitionColumns.isEmpty) {
-            None
-          } else {
-            Some(partitionColumnsSchema(schema, partitionColumns))
-          }
-          val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-          val paths = {
-            val patternPath = new Path(caseInsensitiveOptions("path"))
-            val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-            val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-            SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
-          }
-          val dataSchema =
-            StructType(schema.filterNot(f => partitionColumns.contains(
-          dataSource.createRelation(
-            sqlContext,
-            paths,
-            Some(dataSchema),
-            maybePartitionsSchema,
-            caseInsensitiveOptions)
-        case dataSource: org.apache.spark.sql.sources.RelationProvider =>
-          throw new AnalysisException(s"$className does not allow user-specified schemas.")
-        case _ =>
-          throw new AnalysisException(s"$className is not a RelationProvider.")
-      }
-      case None => clazz.newInstance() match {
-        case dataSource: RelationProvider =>
-          dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
-        case dataSource: HadoopFsRelationProvider =>
-          val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-          val paths = {
-            val patternPath = new Path(caseInsensitiveOptions("path"))
-            val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-            val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-            SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
-          }
-          dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
-        case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
-          throw new AnalysisException(
-            s"A schema needs to be specified when using $className.")
-        case _ =>
-          throw new AnalysisException(
-            s"$className is neither a RelationProvider nor a FSBasedRelationProvider.")
-      }
-    }
-    new ResolvedDataSource(clazz, relation)
-  }
-  private def partitionColumnsSchema(
-      schema: StructType,
-      partitionColumns: Array[String]): StructType = {
-    StructType( { col =>
-      schema.find( == col).getOrElse {
-        throw new RuntimeException(s"Partition column $col not found in schema $schema")
-      }
-    }).asNullable
-  }
-  /** Create a [[ResolvedDataSource]] for saving the content of the given [[DataFrame]]. */
-  def apply(
-      sqlContext: SQLContext,
-      provider: String,
-      partitionColumns: Array[String],
-      mode: SaveMode,
-      options: Map[String, String],
-      data: DataFrame): ResolvedDataSource = {
-    if ([IntervalType])) {
-      throw new AnalysisException("Cannot save interval data type into external storage.")
-    }
-    val clazz: Class[_] = lookupDataSource(provider)
-    val relation = clazz.newInstance() match {
-      case dataSource: CreatableRelationProvider =>
-        dataSource.createRelation(sqlContext, mode, options, data)
-      case dataSource: HadoopFsRelationProvider =>
-        // Don't glob path for the write path.  The contracts here are:
-        //  1. Only one output path can be specified on the write path;
-        //  2. Output path must be a legal HDFS style file system path;
-        //  3. It's OK that the output path doesn't exist yet;
-        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-        val outputPath = {
-          val path = new Path(caseInsensitiveOptions("path"))
-          val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-          path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        }
-        val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(
-        val r = dataSource.createRelation(
-          sqlContext,
-          Array(outputPath.toString),
-          Some(dataSchema.asNullable),
-          Some(partitionColumnsSchema(data.schema, partitionColumns)),
-          caseInsensitiveOptions)
-        // For partitioned relation r, r.schema's column ordering can be different from the column
-        // ordering of data.logicalPlan (partition columns are all moved after data column).  This
-        // will be adjusted within InsertIntoHadoopFsRelation.
-        sqlContext.executePlan(
-          InsertIntoHadoopFsRelation(
-            r,
-            data.logicalPlan,
-            mode)).toRdd
-        r
-      case _ =>
-        sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
-    }
-    new ResolvedDataSource(clazz, relation)
-  }
-private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
- * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
- * @param table The table to be described.
- * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
- *                   It is effective only when the table is a Hive table.
- */
-private[sql] case class DescribeCommand(
-    table: LogicalPlan,
-    isExtended: Boolean) extends LogicalPlan with Command {
-  override def children: Seq[LogicalPlan] = Seq.empty
-  override val output: Seq[Attribute] = Seq(
-    // Column names are based on Hive.
-    AttributeReference("col_name", StringType, nullable = false,
-      new MetadataBuilder().putString("comment", "name of the column").build())(),
-    AttributeReference("data_type", StringType, nullable = false,
-      new MetadataBuilder().putString("comment", "data type of the column").build())(),
-    AttributeReference("comment", StringType, nullable = false,
-      new MetadataBuilder().putString("comment", "comment of the column").build())())
-  * Used to represent the operation of create table using a data source.
-  * @param allowExisting If it is true, we will do nothing when the table already exists.
-  *                      If it is false, an exception will be thrown
-  */
-private[sql] case class CreateTableUsing(
-    tableName: String,
-    userSpecifiedSchema: Option[StructType],
-    provider: String,
-    temporary: Boolean,
-    options: Map[String, String],
-    allowExisting: Boolean,
-    managedIfNoPath: Boolean) extends LogicalPlan with Command {
-  override def output: Seq[Attribute] = Seq.empty
-  override def children: Seq[LogicalPlan] = Seq.empty
- * A node used to support CTAS statements and saveAsTable for the data source API.
- * This node is a [[UnaryNode]] instead of a [[Command]] because we want the analyzer
- * can analyze the logical plan that will be used to populate the table.
- * So, [[PreWriteCheck]] can detect cases that are not allowed.
- */
-private[sql] case class CreateTableUsingAsSelect(
-    tableName: String,
-    provider: String,
-    temporary: Boolean,
-    partitionColumns: Array[String],
-    mode: SaveMode,
-    options: Map[String, String],
-    child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = Seq.empty[Attribute]
-  // TODO: Override resolved after we support databaseName.
-  // override lazy val resolved = databaseName != None && childrenResolved
-private[sql] case class CreateTempTableUsing(
-    tableName: String,
-    userSpecifiedSchema: Option[StructType],
-    provider: String,
-    options: Map[String, String]) extends RunnableCommand {
-  def run(sqlContext: SQLContext): Seq[InternalRow] = {
-    val resolved = ResolvedDataSource(
-      sqlContext, userSpecifiedSchema, Array.empty[String], provider, options)
-    sqlContext.registerDataFrameAsTable(
-      DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
-    Seq.empty
-  }
-private[sql] case class CreateTempTableUsingAsSelect(
-    tableName: String,
-    provider: String,
-    partitionColumns: Array[String],
-    mode: SaveMode,
-    options: Map[String, String],
-    query: LogicalPlan) extends RunnableCommand {
-  override def run(sqlContext: SQLContext): Seq[InternalRow] = {
-    val df = DataFrame(sqlContext, query)
-    val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df)
-    sqlContext.registerDataFrameAsTable(
-      DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
-    Seq.empty
-  }
-private[sql] case class RefreshTable(databaseName: String, tableName: String)
-  extends RunnableCommand {
-  override def run(sqlContext: SQLContext): Seq[InternalRow] = {
-    // Refresh the given table's metadata first.
-    sqlContext.catalog.refreshTable(databaseName, tableName)
-    // If this table is cached as a InMemoryColumnarRelation, drop the original
-    // cached version and make the new version cached lazily.
-    val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
-    // Use lookupCachedData directly since RefreshTable also takes databaseName.
-    val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
-    if (isCached) {
-      // Create a data frame to represent the table.
-      // TODO: Use uncacheTable once it supports database name.
-      val df = DataFrame(sqlContext, logicalPlan)
-      // Uncache the logicalPlan.
-      sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
-      // Cache it again.
-      sqlContext.cacheManager.cacheQuery(df, Some(tableName))
-    }
-    Seq.empty[InternalRow]
-  }
- * Builds a map in which keys are case insensitive
- */
-protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
-  with Serializable {
-  val baseMap = => kv.copy(_1 = kv._1.toLowerCase))
-  override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
-  override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
-    baseMap + kv.copy(_1 = kv._1.toLowerCase)
-  override def iterator: Iterator[(String, String)] = baseMap.iterator
-  override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
- * The exception thrown from the DDL parser.
- */
-protected[sql] class DDLException(message: String) extends Exception(message)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 24e86ca..4d942e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -17,6 +17,10 @@
 package org.apache.spark.sql.sources
+// This file defines all the filters that we can push down to the data sources.
  * A filter predicate for data sources.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 2cd8b35..7cd005b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.sources
 import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
 import org.apache.hadoop.conf.Configuration
@@ -33,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.execution.RDDConversions
+import org.apache.spark.sql.execution.datasources.{PartitioningUtils, PartitionSpec, Partition}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql._
 import org.apache.spark.util.SerializableConfiguration
@@ -523,7 +523,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
-  private[sources] final def buildScan(
+  private[sql] final def buildScan(
       requiredColumns: Array[String],
       filters: Array[Filter],
       inputPaths: Array[String],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
deleted file mode 100644
index 40ee048..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-package org.apache.spark.sql.sources
-import org.apache.spark.sql.{SaveMode, AnalysisException}
-import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias}
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.types.DataType
- * A rule to do pre-insert data type casting and field renaming. Before we insert into
- * an [[InsertableRelation]], we will use this rule to make sure that
- * the columns to be inserted have the correct data type and fields have the correct names.
- */
-private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-      // Wait until children are resolved.
-      case p: LogicalPlan if !p.childrenResolved => p
-      // We are inserting into an InsertableRelation or HadoopFsRelation.
-      case i @ InsertIntoTable(
-      l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => {
-        // First, make sure the data to be inserted have the same number of fields with the
-        // schema of the relation.
-        if (l.output.size != child.output.size) {
-          sys.error(
-            s"$l requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE " +
-              s"statement generates the same number of columns as its schema.")
-        }
-        castAndRenameChildOutput(i, l.output, child)
-      }
-  }
-  /** If necessary, cast data types and rename fields to the expected types and names. */
-  def castAndRenameChildOutput(
-      insertInto: InsertIntoTable,
-      expectedOutput: Seq[Attribute],
-      child: LogicalPlan): InsertIntoTable = {
-    val newChildOutput = {
-      case (expected, actual) =>
-        val needCast = !expected.dataType.sameType(actual.dataType)
-        // We want to make sure the filed names in the data to be inserted exactly match
-        // names in the schema.
-        val needRename = !=
-        (needCast, needRename) match {
-          case (true, _) => Alias(Cast(actual, expected.dataType),
-          case (false, true) => Alias(actual,
-          case (_, _) => actual
-        }
-    }
-    if (newChildOutput == child.output) {
-      insertInto
-    } else {
-      insertInto.copy(child = Project(newChildOutput, child))
-    }
-  }
- * A rule to do various checks before inserting into or writing to a data source table.
- */
-private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) {
-  def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
-  def apply(plan: LogicalPlan): Unit = {
-    plan.foreach {
-      case i @ logical.InsertIntoTable(
-        l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) =>
-        // Right now, we do not support insert into a data source table with partition specs.
-        if (partition.nonEmpty) {
-          failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
-        } else {
-          // Get all input data source relations of the query.
-          val srcRelations = query.collect {
-            case LogicalRelation(src: BaseRelation) => src
-          }
-          if (srcRelations.contains(t)) {
-            failAnalysis(
-              "Cannot insert overwrite into table that is also being read from.")
-          } else {
-            // OK
-          }
-        }
-      case logical.InsertIntoTable(LogicalRelation(r: HadoopFsRelation), part, _, _, _) =>
-        // We need to make sure the partition columns specified by users do match partition
-        // columns of the relation.
-        val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
-        val specifiedPartitionColumns = part.keySet
-        if (existingPartitionColumns != specifiedPartitionColumns) {
-          failAnalysis(s"Specified partition columns " +
-            s"(${specifiedPartitionColumns.mkString(", ")}) " +
-            s"do not match the partition columns of the table. Please use " +
-            s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.")
-        } else {
-          // OK
-        }
-      case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
-        // The relation in l is not an InsertableRelation.
-        failAnalysis(s"$l does not allow insertion.")
-      case logical.InsertIntoTable(t, _, _, _, _) =>
-        if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) {
-          failAnalysis(s"Inserting into an RDD-based table is not allowed.")
-        } else {
-          // OK
-        }
-      case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
-        // When the SaveMode is Overwrite, we need to check if the table is an input table of
-        // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
-        if (catalog.tableExists(Seq(tableName))) {
-          // Need to remove SubQuery operator.
-          EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match {
-            // Only do the check if the table is a data source table
-            // (the relation is a BaseRelation).
-            case l @ LogicalRelation(dest: BaseRelation) =>
-              // Get all input data source relations of the query.
-              val srcRelations = query.collect {
-                case LogicalRelation(src: BaseRelation) => src
-              }
-              if (srcRelations.contains(dest)) {
-                failAnalysis(
-                  s"Cannot overwrite table $tableName that is also being read from.")
-              } else {
-                // OK
-              }
-            case _ => // OK
-          }
-        } else {
-          // OK
-        }
-      case _ => // OK
-    }
-  }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 3475f9d..1d04513 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -26,8 +26,8 @@ import org.scalactic.Tolerance._
 import org.apache.spark.sql.{QueryTest, Row, SQLConf}
 import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.json.InferSchema.compatibleType
-import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index a2763c7..23df102 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.sources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 37b0a9f..4f98776 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -28,11 +28,11 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.sources.PartitioningUtils._
-import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec}
+import org.apache.spark.sql.execution.datasources.{LogicalRelation, PartitionSpec, Partition, PartitioningUtils}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql._
 import org.apache.spark.unsafe.types.UTF8String
+import PartitioningUtils._
 // The data where the partitioning key exists only in the directory structure.
 case class ParquetData(intField: Int, stringField: String)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index a710884..1907e64 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -22,6 +22,7 @@ import{File, IOException}
 import org.scalatest.BeforeAndAfterAll
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DDLException
 import org.apache.spark.util.Utils
 class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 296b0d6..3cbf546 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.sources
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.datasources.ResolvedDataSource
 class ResolvedDataSourceSuite extends SparkFunSuite {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 4684d48..cec7685 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -44,9 +44,9 @@ import org.apache.spark.sql.catalyst.ParserDialect
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUDFs, SetCommand}
+import org.apache.spark.sql.execution.datasources.{PreWriteCheck, PreInsertCastAndRename, DataSourceStrategy}
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.sources.DataSourceStrategy
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -384,11 +384,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
         catalog.PreInsertionCasts ::
         ExtractPythonUDFs ::
         ResolveHiveWindowFunction ::
-        sources.PreInsertCastAndRename ::
+        PreInsertCastAndRename ::
       override val extendedCheckRules = Seq(
-        sources.PreWriteCheck(catalog)
+        PreWriteCheck(catalog)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b15261b..0a2121c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,6 +17,8 @@
 package org.apache.spark.sql.hive
+import scala.collection.JavaConversions._
 import{CacheBuilder, CacheLoader, LoadingCache}
@@ -28,6 +30,7 @@ import org.apache.hadoop.hive.ql.metadata._
 import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.spark.Logging
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions._
@@ -35,14 +38,12 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.datasources
+import org.apache.spark.sql.execution.datasources.{Partition => ParquetPartition, PartitionSpec, CreateTableUsingAsSelect, ResolvedDataSource, LogicalRelation}
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.parquet.ParquetRelation2
-import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
-/* Implicit conversions */
-import scala.collection.JavaConversions._
 private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
   extends Catalog with Logging {
@@ -278,7 +279,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
             parquetRelation.paths.toSet == pathsInMetastore.toSet &&
             logical.schema.sameType(metastoreSchema) &&
             parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse {
-              PartitionSpec(StructType(Nil), Array.empty[sources.Partition])
+              PartitionSpec(StructType(Nil), Array.empty[datasources.Partition])
           if (useCached) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 7fc517b..f557450 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.execution.ExplainCommand
-import org.apache.spark.sql.sources.DescribeCommand
+import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.HiveShim._
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable, HiveScriptIOSchema}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 9638a82..a22c329 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -30,9 +30,9 @@ import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, _}
+import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand}
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand}
 import org.apache.spark.sql.types.StringType
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 71fa3e9..a47f9a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 48d35a6..de63ee5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -37,6 +37,7 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.{HadoopRDD, RDD}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.datasources.PartitionSpec
 import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreTypes, HiveShim}
 import org.apache.spark.sql.sources.{Filter, _}
 import org.apache.spark.sql.types.StructType
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d910af2..e403f32 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -28,12 +28,12 @@ import org.apache.hadoop.mapred.InvalidInputException
 import org.apache.spark.Logging
 import org.apache.spark.sql._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.parquet.ParquetRelation2
-import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index c9dd4c0..efb04bf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -22,11 +22,11 @@ import
 import org.scalatest.{BeforeAndAfterAll, GivenWhenThen}
 import org.apache.spark.{Logging, SparkFunSuite}
-import org.apache.spark.sql.sources.DescribeCommand
-import org.apache.spark.sql.execution.{SetCommand, ExplainCommand}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.{SetCommand, ExplainCommand}
+import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.test.TestHive
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 05a1f00..0342826 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -23,12 +23,12 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.DefaultParserDialect
 import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.errors.DialectException
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, MetastoreRelation}
 import org.apache.spark.sql.parquet.ParquetRelation2
-import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
 case class Nested1(f1: Nested2)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 9d79a4b..82a8daf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -23,12 +23,12 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
+import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.execution.HiveTableScan
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
-import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index afecf96..1cef83f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,10 +17,10 @@
 package org.apache.spark.sql.sources
-import scala.collection.JavaConversions._
+import scala.collection.JavaConversions._
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -31,10 +31,12 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
 import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
   override lazy val sqlContext: SQLContext = TestHive

[3/3] spark git commit: [SPARK-8906][SQL] Move all internal data source classes into execution.datasources.

Posted by
[SPARK-8906][SQL] Move all internal data source classes into execution.datasources.

This way, the sources package contains only public facing interfaces.

Author: Reynold Xin <>

Closes #7565 from rxin/move-ds and squashes the following commits:

7661aff [Reynold Xin] Mima
9d5196a [Reynold Xin] Rearranged imports.
3dd7174 [Reynold Xin] [SPARK-8906][SQL] Move all internal data source classes into execution.datasources.


Branch: refs/heads/master
Commit: 60c0ce134d90ef18852ed2c637d2f240b7f99ab9
Parents: 9ba7c64
Author: Reynold Xin <>
Authored: Tue Jul 21 11:56:38 2015 -0700
Committer: Reynold Xin <>
Committed: Tue Jul 21 11:56:38 2015 -0700

 project/MimaExcludes.scala                      |  47 ++
 .../scala/org/apache/spark/sql/DataFrame.scala  |   2 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |   4 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |   2 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   9 +-
 .../spark/sql/execution/SparkStrategies.scala   |   4 +-
 .../spark/sql/execution/SqlNewHadoopRDD.scala   | 263 +++++++++
 .../datasources/DataSourceStrategy.scala        | 394 +++++++++++++
 .../execution/datasources/LogicalRelation.scala |  58 ++
 .../datasources/PartitioningUtils.scala         | 361 ++++++++++++
 .../sql/execution/datasources/commands.scala    | 582 +++++++++++++++++++
 .../spark/sql/execution/datasources/ddl.scala   | 492 ++++++++++++++++
 .../spark/sql/execution/datasources/rules.scala | 158 +++++
 .../apache/spark/sql/parquet/newParquet.scala   |   5 +-
 .../spark/sql/sources/DataSourceStrategy.scala  | 395 -------------
 .../spark/sql/sources/LogicalRelation.scala     |  57 --
 .../spark/sql/sources/PartitioningUtils.scala   | 360 ------------
 .../spark/sql/sources/SqlNewHadoopRDD.scala     | 264 ---------
 .../org/apache/spark/sql/sources/commands.scala | 581 ------------------
 .../org/apache/spark/sql/sources/ddl.scala      | 493 ----------------
 .../org/apache/spark/sql/sources/filters.scala  |   4 +
 .../apache/spark/sql/sources/interfaces.scala   |   4 +-
 .../org/apache/spark/sql/sources/rules.scala    | 158 -----
 .../org/apache/spark/sql/json/JsonSuite.scala   |   2 +-
 .../spark/sql/parquet/ParquetFilterSuite.scala  |   2 +-
 .../ParquetPartitionDiscoverySuite.scala        |   4 +-
 .../sql/sources/CreateTableAsSelectSuite.scala  |   1 +
 .../sql/sources/ResolvedDataSourceSuite.scala   |   1 +
 .../org/apache/spark/sql/hive/HiveContext.scala |   6 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  11 +-
 .../org/apache/spark/sql/hive/HiveQl.scala      |   2 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |   2 +-
 .../spark/sql/hive/execution/commands.scala     |   1 +
 .../apache/spark/sql/hive/orc/OrcRelation.scala |   1 +
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   2 +-
 .../sql/hive/execution/HiveComparisonTest.scala |   4 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   2 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |   2 +-
 .../sql/sources/hadoopFsRelationSuites.scala    |   6 +-
 39 files changed, 2404 insertions(+), 2342 deletions(-)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a2595ff..fa36629 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -104,6 +104,53 @@ object MimaExcludes {
             // SPARK-7422 add argmax for sparse vectors
+          ) ++ Seq(
+            // SPARK-8906 Move all internal data source classes into execution.datasources
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.ResolvedDataSource"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopPartition"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DefaultWriterContainer"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DynamicPartitionWriterContainer"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.BaseWriterContainer"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.ResolvedDataSource$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLParser"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CaseInsensitiveMap"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException")
         case v if v.startsWith("1.4") =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 830fba3..323ff17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -38,8 +38,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
+import org.apache.spark.sql.execution.datasources.CreateTableUsingAsSelect
 import org.apache.spark.sql.json.JacksonGenerator
-import org.apache.spark.sql.sources.CreateTableUsingAsSelect
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index f1c1ddf..e9d782c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -20,16 +20,16 @@ package org.apache.spark.sql
 import java.util.Properties
 import org.apache.hadoop.fs.Path
-import org.apache.spark.{Logging, Partition}
+import org.apache.spark.{Logging, Partition}
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
 import org.apache.spark.sql.json.JSONRelation
 import org.apache.spark.sql.parquet.ParquetRelation2
-import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
 import org.apache.spark.sql.types.StructType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3e7b9cd..ee0201a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -22,8 +22,8 @@ import java.util.Properties
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource}
 import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
-import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 2dda3ad..8b4528b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -39,8 +39,9 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _}
-import org.apache.spark.sql.execution.{Filter, _}
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
@@ -146,11 +147,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
     new Analyzer(catalog, functionRegistry, conf) {
       override val extendedResolutionRules =
         ExtractPythonUDFs ::
-        sources.PreInsertCastAndRename ::
+        PreInsertCastAndRename ::
       override val extendedCheckRules = Seq(
-        sources.PreWriteCheck(catalog)
+        datasources.PreWriteCheck(catalog)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 240332a..8cef7f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution
+import org.apache.spark.sql.{SQLContext, Strategy, execution}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
@@ -25,10 +26,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
 import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
+import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
 import org.apache.spark.sql.parquet._
-import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{SQLContext, Strategy, execution}
 private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   self: SQLContext#SparkPlanner =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala
new file mode 100644
index 0000000..e1c1a6c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala
@@ -0,0 +1,263 @@
+package org.apache.spark.sql.execution
+import java.text.SimpleDateFormat
+import java.util.Date
+import org.apache.spark.{Partition => SparkPartition, _}
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
+import org.apache.spark.rdd.{HadoopRDD, RDD}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+import scala.reflect.ClassTag
+private[spark] class SqlNewHadoopPartition(
+    rddId: Int,
+    val index: Int,
+    @transient rawSplit: InputSplit with Writable)
+  extends SparkPartition {
+  val serializableHadoopSplit = new SerializableWritable(rawSplit)
+  override def hashCode(): Int = 41 * (41 + rddId) + index
+ * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
+ * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
+ * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three additions.
+ * 1. A shared broadcast Hadoop Configuration.
+ * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations at the driver side
+ *    to the shared Hadoop Configuration.
+ * 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side
+ *    and the executor side to the shared Hadoop Configuration.
+ *
+ * Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with
+ * changes based on [[org.apache.spark.rdd.HadoopRDD]]. In future, this functionality will be
+ * folded into core.
+ */
+private[sql] class SqlNewHadoopRDD[K, V](
+    @transient sc : SparkContext,
+    broadcastedConf: Broadcast[SerializableConfiguration],
+    @transient initDriverSideJobFuncOpt: Option[Job => Unit],
+    initLocalJobFuncOpt: Option[Job => Unit],
+    inputFormatClass: Class[_ <: InputFormat[K, V]],
+    keyClass: Class[K],
+    valueClass: Class[V])
+  extends RDD[(K, V)](sc, Nil)
+  with SparkHadoopMapReduceUtil
+  with Logging {
+  protected def getJob(): Job = {
+    val conf: Configuration = broadcastedConf.value.value
+    // "new Job" will make a copy of the conf. Then, it is
+    // safe to mutate conf properties with initLocalJobFuncOpt
+    // and initDriverSideJobFuncOpt.
+    val newJob = new Job(conf)
+ => f(newJob))
+    newJob
+  }
+  def getConf(isDriverSide: Boolean): Configuration = {
+    val job = getJob()
+    if (isDriverSide) {
+ => f(job))
+    }
+    job.getConfiguration
+  }
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
+  @transient protected val jobId = new JobID(jobTrackerId, id)
+  override def getPartitions: Array[SparkPartition] = {
+    val conf = getConf(isDriverSide = true)
+    val inputFormat = inputFormatClass.newInstance
+    inputFormat match {
+      case configurable: Configurable =>
+        configurable.setConf(conf)
+      case _ =>
+    }
+    val jobContext = newJobContext(conf, jobId)
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val result = new Array[SparkPartition](rawSplits.size)
+    for (i <- 0 until rawSplits.size) {
+      result(i) =
+        new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+    }
+    result
+  }
+  override def compute(
+      theSplit: SparkPartition,
+      context: TaskContext): InterruptibleIterator[(K, V)] = {
+    val iter = new Iterator[(K, V)] {
+      val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
+      logInfo("Input split: " + split.serializableHadoopSplit)
+      val conf = getConf(isDriverSide = false)
+      val inputMetrics = context.taskMetrics
+        .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+      // Find a function that will return the FileSystem bytes read by this thread. Do this before
+      // creating RecordReader, because RecordReader's constructor might read some bytes
+      val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
+        split.serializableHadoopSplit.value match {
+          case _: FileSplit | _: CombineFileSplit =>
+            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+          case _ => None
+        }
+      }
+      inputMetrics.setBytesReadCallback(bytesReadCallback)
+      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
+      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+      val format = inputFormatClass.newInstance
+      format match {
+        case configurable: Configurable =>
+          configurable.setConf(conf)
+        case _ =>
+      }
+      val reader = format.createRecordReader(
+        split.serializableHadoopSplit.value, hadoopAttemptContext)
+      reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+      // Register an on-task-completion callback to close the input stream.
+      context.addTaskCompletionListener(context => close())
+      var havePair = false
+      var finished = false
+      var recordsSinceMetricsUpdate = 0
+      override def hasNext: Boolean = {
+        if (!finished && !havePair) {
+          finished = !reader.nextKeyValue
+          havePair = !finished
+        }
+        !finished
+      }
+      override def next(): (K, V) = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        if (!finished) {
+          inputMetrics.incRecordsRead(1)
+        }
+        (reader.getCurrentKey, reader.getCurrentValue)
+      }
+      private def close() {
+        try {
+          reader.close()
+          if (bytesReadCallback.isDefined) {
+            inputMetrics.updateBytesRead()
+          } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+                     split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
+            // If we can't get the bytes read from the FS stats, fall back to the split size,
+            // which may be inaccurate.
+            try {
+              inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+            } catch {
+              case e: =>
+                logWarning("Unable to get input size to set InputMetrics for task", e)
+            }
+          }
+        } catch {
+          case e: Exception => {
+            if (!Utils.inShutdown()) {
+              logWarning("Exception in RecordReader.close()", e)
+            }
+          }
+        }
+      }
+    }
+    new InterruptibleIterator(context, iter)
+  }
+  /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
+  @DeveloperApi
+  def mapPartitionsWithInputSplit[U: ClassTag](
+      f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
+      preservesPartitioning: Boolean = false): RDD[U] = {
+    new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
+  }
+  override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = {
+    val split = hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value
+    val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
+      case Some(c) =>
+        try {
+          val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
+          Some(HadoopRDD.convertSplitLocationInfo(infos))
+        } catch {
+          case e : Exception =>
+            logDebug("Failed to use InputSplit#getLocationInfo.", e)
+            None
+        }
+      case None => None
+    }
+    locs.getOrElse(split.getLocations.filter(_ != "localhost"))
+  }
+  override def persist(storageLevel: StorageLevel): this.type = {
+    if (storageLevel.deserialized) {
+      logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" +
+        " behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
+        " Use a map transformation to make copies of the records.")
+    }
+    super.persist(storageLevel)
+  }
+private[spark] object SqlNewHadoopRDD {
+  /**
+   * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
+   * the given function rather than the index of the partition.
+   */
+  private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
+      prev: RDD[T],
+      f: (InputSplit, Iterator[T]) => Iterator[U],
+      preservesPartitioning: Boolean = false)
+    extends RDD[U](prev) {
+    override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
+    override def getPartitions: Array[SparkPartition] = firstParent[T].partitions
+    override def compute(split: SparkPartition, context: TaskContext): Iterator[U] = {
+      val partition = split.asInstanceOf[SqlNewHadoopPartition]
+      val inputSplit = partition.serializableHadoopSplit.value
+      f(inputSplit, firstParent[T].iterator(split, context))
+    }
+  }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
new file mode 100644
index 0000000..2b40092
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -0,0 +1,394 @@
+package org.apache.spark.sql.execution.datasources
+import org.apache.spark.{Logging, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+ * A Strategy for planning scans over data sources defined using the sources API.
+ */
+private[sql] object DataSourceStrategy extends Strategy with Logging {
+  def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan)) =>
+      pruneFilterProjectRaw(
+        l,
+        projects,
+        filters,
+        (a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan)) =>
+      pruneFilterProject(
+        l,
+        projects,
+        filters,
+        (a, f) => toCatalystRDD(l, a, t.buildScan(, f))) :: Nil
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan)) =>
+      pruneFilterProject(
+        l,
+        projects,
+        filters,
+        (a, _) => toCatalystRDD(l, a, t.buildScan( :: Nil
+    // Scanning partitioned HadoopFsRelation
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation))
+        if t.partitionSpec.partitionColumns.nonEmpty =>
+      val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
+      logInfo {
+        val total = t.partitionSpec.partitions.length
+        val selected = selectedPartitions.length
+        val percentPruned = (1 - selected.toDouble / total.toDouble) * 100
+        s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
+      }
+      // Only pushes down predicates that do not reference partition columns.
+      val pushedFilters = {
+        val partitionColumnNames =
+        filters.filter { f =>
+          val referencedColumnNames =
+          referencedColumnNames.intersect(partitionColumnNames).isEmpty
+        }
+      }
+      buildPartitionedTableScan(
+        l,
+        projects,
+        pushedFilters,
+        t.partitionSpec.partitionColumns,
+        selectedPartitions) :: Nil
+    // Scanning non-partitioned HadoopFsRelation
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
+      // See buildPartitionedTableScan for the reason that we need to create a shard
+      // broadcast HadoopConf.
+      val sharedHadoopConf = SparkHadoopUtil.get.conf
+      val confBroadcast =
+        t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
+      pruneFilterProject(
+        l,
+        projects,
+        filters,
+        (a, f) =>
+          toCatalystRDD(l, a, t.buildScan(, f, t.paths, confBroadcast))) :: Nil
+    case l @ LogicalRelation(t: TableScan) =>
+      execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil
+    case i @ logical.InsertIntoTable(
+      l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty =>
+      execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
+    case i @ logical.InsertIntoTable(
+      l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) =>
+      val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
+      execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil
+    case _ => Nil
+  }
+  private def buildPartitionedTableScan(
+      logicalRelation: LogicalRelation,
+      projections: Seq[NamedExpression],
+      filters: Seq[Expression],
+      partitionColumns: StructType,
+      partitions: Array[Partition]) = {
+    val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
+    // Because we are creating one RDD per partition, we need to have a shared HadoopConf.
+    // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
+    val sharedHadoopConf = SparkHadoopUtil.get.conf
+    val confBroadcast =
+      relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
+    // Builds RDD[Row]s for each selected partition.
+    val perPartitionRows = { case Partition(partitionValues, dir) =>
+      // The table scan operator (PhysicalRDD) which retrieves required columns from data files.
+      // Notice that the schema of data files, represented by `relation.dataSchema`, may contain
+      // some partition column(s).
+      val scan =
+        pruneFilterProject(
+          logicalRelation,
+          projections,
+          filters,
+          (columns: Seq[Attribute], filters) => {
+            val partitionColNames = partitionColumns.fieldNames
+            // Don't scan any partition columns to save I/O.  Here we are being optimistic and
+            // assuming partition columns data stored in data files are always consistent with those
+            // partition values encoded in partition directory paths.
+            val needed = columns.filterNot(a => partitionColNames.contains(
+            val dataRows =
+              relation.buildScan(, filters, Array(dir), confBroadcast)
+            // Merges data values with partition values.
+            mergeWithPartitionValues(
+              relation.schema,
+    ,
+              partitionColNames,
+              partitionValues,
+              toCatalystRDD(logicalRelation, needed, dataRows))
+          })
+      scan.execute()
+    }
+    val unionedRows =
+      if (perPartitionRows.length == 0) {
+        relation.sqlContext.emptyResult
+      } else {
+        new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
+      }
+    execution.PhysicalRDD(, unionedRows)
+  }
+  private def mergeWithPartitionValues(
+      schema: StructType,
+      requiredColumns: Array[String],
+      partitionColumns: Array[String],
+      partitionValues: InternalRow,
+      dataRows: RDD[InternalRow]): RDD[InternalRow] = {
+    val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains)
+    // If output columns contain any partition column(s), we need to merge scanned data
+    // columns and requested partition columns to form the final result.
+    if (!requiredColumns.sameElements(nonPartitionColumns)) {
+      val mergers = { case (name, index) =>
+        // To see whether the `index`-th column is a partition column...
+        val i = partitionColumns.indexOf(name)
+        if (i != -1) {
+          // If yes, gets column value from partition values.
+          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
+            mutableRow(ordinal) = partitionValues(i)
+          }
+        } else {
+          // Otherwise, inherits the value from scanned data.
+          val i = nonPartitionColumns.indexOf(name)
+          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
+            mutableRow(ordinal) = dataRow(i)
+          }
+        }
+      }
+      // Since we know for sure that this closure is serializable, we can avoid the overhead
+      // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
+      // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
+      val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
+        val dataTypes =
+        val mutableRow = new SpecificMutableRow(dataTypes)
+ { dataRow =>
+          var i = 0
+          while (i < mutableRow.length) {
+            mergers(i)(mutableRow, dataRow, i)
+            i += 1
+          }
+          mutableRow.asInstanceOf[InternalRow]
+        }
+      }
+      // This is an internal RDD whose call site the user should not be concerned with
+      // Since we create many of these (one per partition), the time spent on computing
+      // the call site may add up.
+      Utils.withDummyCallSite(dataRows.sparkContext) {
+        new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
+      }
+    } else {
+      dataRows
+    }
+  }
+  protected def prunePartitions(
+      predicates: Seq[Expression],
+      partitionSpec: PartitionSpec): Seq[Partition] = {
+    val PartitionSpec(partitionColumns, partitions) = partitionSpec
+    val partitionColumnNames =
+    val partitionPruningPredicates = predicates.filter {
+    }
+    if (partitionPruningPredicates.nonEmpty) {
+      val predicate =
+        partitionPruningPredicates
+          .reduceOption(expressions.And)
+          .getOrElse(Literal(true))
+      val boundPredicate = InterpretedPredicate.create(predicate.transform {
+        case a: AttributeReference =>
+          val index = partitionColumns.indexWhere( ==
+          BoundReference(index, partitionColumns(index).dataType, nullable = true)
+      })
+      partitions.filter { case Partition(values, _) => boundPredicate(values) }
+    } else {
+      partitions
+    }
+  }
+  // Based on Public API.
+  protected def pruneFilterProject(
+      relation: LogicalRelation,
+      projects: Seq[NamedExpression],
+      filterPredicates: Seq[Expression],
+      scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]) = {
+    pruneFilterProjectRaw(
+      relation,
+      projects,
+      filterPredicates,
+      (requestedColumns, pushedFilters) => {
+        scanBuilder(requestedColumns, selectFilters(pushedFilters).toArray)
+      })
+  }
+  // Based on Catalyst expressions.
+  protected def pruneFilterProjectRaw(
+      relation: LogicalRelation,
+      projects: Seq[NamedExpression],
+      filterPredicates: Seq[Expression],
+      scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[InternalRow]) = {
+    val projectSet = AttributeSet(projects.flatMap(_.references))
+    val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
+    val filterCondition = filterPredicates.reduceLeftOption(expressions.And)
+    val pushedFilters = { _ transform {
+      case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
+    }}
+    if ( == projects &&
+        projectSet.size == projects.size &&
+        filterSet.subsetOf(projectSet)) {
+      // When it is possible to just use column pruning to get the right projection and
+      // when the columns of this projection are enough to evaluate all filter conditions,
+      // just do a scan followed by a filter, with no extra project.
+      val requestedColumns =
+        projects.asInstanceOf[Seq[Attribute]] // Safe due to if above.
+          .map(relation.attributeMap)            // Match original case of attributes.
+      val scan = execution.PhysicalRDD(,
+        scanBuilder(requestedColumns, pushedFilters))
+, scan)).getOrElse(scan)
+    } else {
+      val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq
+      val scan = execution.PhysicalRDD(requestedColumns,
+        scanBuilder(requestedColumns, pushedFilters))
+      execution.Project(projects,, scan)).getOrElse(scan))
+    }
+  }
+  /**
+   * Convert RDD of Row into RDD of InternalRow with objects in catalyst types
+   */
+  private[this] def toCatalystRDD(
+      relation: LogicalRelation,
+      output: Seq[Attribute],
+      rdd: RDD[Row]): RDD[InternalRow] = {
+    if (relation.relation.needConversion) {
+      execution.RDDConversions.rowToRowRdd(rdd,
+    } else {
+    }
+  }
+  /**
+   * Convert RDD of Row into RDD of InternalRow with objects in catalyst types
+   */
+  private[this] def toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]): RDD[InternalRow] = {
+    toCatalystRDD(relation, relation.output, rdd)
+  }
+  /**
+   * Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s,
+   * and convert them.
+   */
+  protected[sql] def selectFilters(filters: Seq[Expression]) = {
+    def translate(predicate: Expression): Option[Filter] = predicate match {
+      case expressions.EqualTo(a: Attribute, Literal(v, _)) =>
+        Some(sources.EqualTo(, v))
+      case expressions.EqualTo(Literal(v, _), a: Attribute) =>
+        Some(sources.EqualTo(, v))
+      case expressions.GreaterThan(a: Attribute, Literal(v, _)) =>
+        Some(sources.GreaterThan(, v))
+      case expressions.GreaterThan(Literal(v, _), a: Attribute) =>
+        Some(sources.LessThan(, v))
+      case expressions.LessThan(a: Attribute, Literal(v, _)) =>
+        Some(sources.LessThan(, v))
+      case expressions.LessThan(Literal(v, _), a: Attribute) =>
+        Some(sources.GreaterThan(, v))
+      case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, _)) =>
+        Some(sources.GreaterThanOrEqual(, v))
+      case expressions.GreaterThanOrEqual(Literal(v, _), a: Attribute) =>
+        Some(sources.LessThanOrEqual(, v))
+      case expressions.LessThanOrEqual(a: Attribute, Literal(v, _)) =>
+        Some(sources.LessThanOrEqual(, v))
+      case expressions.LessThanOrEqual(Literal(v, _), a: Attribute) =>
+        Some(sources.GreaterThanOrEqual(, v))
+      case expressions.InSet(a: Attribute, set) =>
+        Some(sources.In(, set.toArray))
+      case expressions.IsNull(a: Attribute) =>
+        Some(sources.IsNull(
+      case expressions.IsNotNull(a: Attribute) =>
+        Some(sources.IsNotNull(
+      case expressions.And(left, right) =>
+        (translate(left) ++ translate(right)).reduceOption(sources.And)
+      case expressions.Or(left, right) =>
+        for {
+          leftFilter <- translate(left)
+          rightFilter <- translate(right)
+        } yield sources.Or(leftFilter, rightFilter)
+      case expressions.Not(child) =>
+        translate(child).map(sources.Not)
+      case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
+        Some(sources.StringStartsWith(, v.toString))
+      case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
+        Some(sources.StringEndsWith(, v.toString))
+      case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
+        Some(sources.StringContains(, v.toString))
+      case _ => None
+    }
+    filters.flatMap(translate)
+  }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
new file mode 100644
index 0000000..a7123dc
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -0,0 +1,58 @@
+package org.apache.spark.sql.execution.datasources
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.sources.BaseRelation
+ * Used to link a [[BaseRelation]] in to a logical query plan.
+ */
+private[sql] case class LogicalRelation(relation: BaseRelation)
+  extends LeafNode
+  with MultiInstanceRelation {
+  override val output: Seq[AttributeReference] = relation.schema.toAttributes
+  // Logical Relations are distinct if they have different output for the sake of transformations.
+  override def equals(other: Any): Boolean = other match {
+    case l @ LogicalRelation(otherRelation) => relation == otherRelation && output == l.output
+    case  _ => false
+  }
+  override def hashCode: Int = {
+, output)
+  }
+  override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
+    case LogicalRelation(otherRelation) => relation == otherRelation
+    case _ => false
+  }
+  @transient override lazy val statistics: Statistics = Statistics(
+    sizeInBytes = BigInt(relation.sizeInBytes)
+  )
+  /** Used to lookup original attribute capitalization */
+  val attributeMap: AttributeMap[AttributeReference] = AttributeMap( => (o, o)))
+  def newInstance(): this.type = LogicalRelation(relation).asInstanceOf[this.type]
+  override def simpleString: String = s"Relation[${output.mkString(",")}] $relation"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
new file mode 100644
index 0000000..6b4a359
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -0,0 +1,361 @@
+package org.apache.spark.sql.execution.datasources
+import java.lang.{Double => JDouble, Long => JLong}
+import java.math.{BigDecimal => JBigDecimal}
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.Shell
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
+import org.apache.spark.sql.types._
+private[sql] case class Partition(values: InternalRow, path: String)
+private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
+private[sql] object PartitionSpec {
+  val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
+private[sql] object PartitioningUtils {
+  // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
+  // depend on Hive.
+  private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
+    require(columnNames.size == literals.size)
+  }
+  /**
+   * Given a group of qualified paths, tries to parse them and returns a partition specification.
+   * For example, given:
+   * {{{
+   *   hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
+   *   hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
+   * }}}
+   * it returns:
+   * {{{
+   *   PartitionSpec(
+   *     partitionColumns = StructType(
+   *       StructField(name = "a", dataType = IntegerType, nullable = true),
+   *       StructField(name = "b", dataType = StringType, nullable = true),
+   *       StructField(name = "c", dataType = DoubleType, nullable = true)),
+   *     partitions = Seq(
+   *       Partition(
+   *         values = Row(1, "hello", 3.14),
+   *         path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
+   *       Partition(
+   *         values = Row(2, "world", 6.28),
+   *         path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
+   * }}}
+   */
+  private[sql] def parsePartitions(
+      paths: Seq[Path],
+      defaultPartitionName: String,
+      typeInference: Boolean): PartitionSpec = {
+    // First, we need to parse every partition's path and see if we can find partition values.
+    val pathsWithPartitionValues = paths.flatMap { path =>
+      parsePartition(path, defaultPartitionName, typeInference).map(path -> _)
+    }
+    if (pathsWithPartitionValues.isEmpty) {
+      // This dataset is not partitioned.
+      PartitionSpec.emptySpec
+    } else {
+      // This dataset is partitioned. We need to check whether all partitions have the same
+      // partition columns and resolve potential type conflicts.
+      val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
+      // Creates the StructType which represents the partition columns.
+      val fields = {
+        val PartitionValues(columnNames, literals) = resolvedPartitionValues.head
+ { case (name, Literal(_, dataType)) =>
+          // We always assume partition columns are nullable since we've no idea whether null values
+          // will be appended in the future.
+          StructField(name, dataType, nullable = true)
+        }
+      }
+      // Finally, we create `Partition`s based on paths and resolved partition values.
+      val partitions = {
+        case (PartitionValues(_, literals), (path, _)) =>
+          Partition(InternalRow.fromSeq(, path.toString)
+      }
+      PartitionSpec(StructType(fields), partitions)
+    }
+  }
+  /**
+   * Parses a single partition, returns column names and values of each partition column.  For
+   * example, given:
+   * {{{
+   *   path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
+   * }}}
+   * it returns:
+   * {{{
+   *   PartitionValues(
+   *     Seq("a", "b", "c"),
+   *     Seq(
+   *       Literal.create(42, IntegerType),
+   *       Literal.create("hello", StringType),
+   *       Literal.create(3.14, FloatType)))
+   * }}}
+   */
+  private[sql] def parsePartition(
+      path: Path,
+      defaultPartitionName: String,
+      typeInference: Boolean): Option[PartitionValues] = {
+    val columns = ArrayBuffer.empty[(String, Literal)]
+    // Old Hadoop versions don't have `Path.isRoot`
+    var finished = path.getParent == null
+    var chopped = path
+    while (!finished) {
+      // Sometimes (e.g., when speculative task is enabled), temporary directories may be left
+      // uncleaned.  Here we simply ignore them.
+      if (chopped.getName.toLowerCase == "_temporary") {
+        return None
+      }
+      val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
+      maybeColumn.foreach(columns += _)
+      chopped = chopped.getParent
+      finished = maybeColumn.isEmpty || chopped.getParent == null
+    }
+    if (columns.isEmpty) {
+      None
+    } else {
+      val (columnNames, values) = columns.reverse.unzip
+      Some(PartitionValues(columnNames, values))
+    }
+  }
+  private def parsePartitionColumn(
+      columnSpec: String,
+      defaultPartitionName: String,
+      typeInference: Boolean): Option[(String, Literal)] = {
+    val equalSignIndex = columnSpec.indexOf('=')
+    if (equalSignIndex == -1) {
+      None
+    } else {
+      val columnName = columnSpec.take(equalSignIndex)
+      assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'")
+      val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
+      assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
+      val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference)
+      Some(columnName -> literal)
+    }
+  }
+  /**
+   * Resolves possible type conflicts between partitions by up-casting "lower" types.  The up-
+   * casting order is:
+   * {{{
+   *   NullType ->
+   *   IntegerType -> LongType ->
+   *   DoubleType -> DecimalType.Unlimited ->
+   *   StringType
+   * }}}
+   */
+  private[sql] def resolvePartitions(
+      pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
+    if (pathsWithPartitionValues.isEmpty) {
+      Seq.empty
+    } else {
+      val distinctPartColNames =
+      assert(
+        distinctPartColNames.size == 1,
+        listConflictingPartitionColumns(pathsWithPartitionValues))
+      // Resolves possible type conflicts for each column
+      val values =
+      val columnCount = values.head.columnNames.size
+      val resolvedValues = (0 until columnCount).map { i =>
+        resolveTypeConflicts(
+      }
+      // Fills resolved literals back to each partition
+ { case (d, index) =>
+        d.copy(literals =
+      }
+    }
+  }
+  private[sql] def listConflictingPartitionColumns(
+      pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
+    val distinctPartColNames =
+    def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
+      seq.groupBy { case (key, _) => key }.mapValues( { case (_, value) => value })
+    val partColNamesToPaths = groupByKey( {
+      case (path, partValues) => partValues.columnNames -> path
+    })
+    val distinctPartColLists =", ")) {
+      case (names, index) =>
+        s"Partition column name list #$index: $names"
+    }
+    // Lists out those non-leaf partition directories that also contain files
+    val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
+    s"Conflicting partition column names detected:\n" +
+      distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
+      "For partitioned table directories, data files should only live in leaf directories.\n" +
+      "And directories at the same level should have the same partition column name.\n" +
+      "Please check the following directories for unexpected files or " +
+      "inconsistent partition column names:\n" +
+"\t" + _).mkString("\n", "\n", "")
+  }
+  /**
+   * Converts a string to a [[Literal]] with automatic type inference.  Currently only supports
+   * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and
+   * [[StringType]].
+   */
+  private[sql] def inferPartitionColumnValue(
+      raw: String,
+      defaultPartitionName: String,
+      typeInference: Boolean): Literal = {
+    if (typeInference) {
+      // First tries integral types
+      Try(Literal.create(Integer.parseInt(raw), IntegerType))
+        .orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
+        // Then falls back to fractional types
+        .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
+        .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited)))
+        // Then falls back to string
+        .getOrElse {
+          if (raw == defaultPartitionName) {
+            Literal.create(null, NullType)
+          } else {
+            Literal.create(unescapePathName(raw), StringType)
+          }
+        }
+    } else {
+      if (raw == defaultPartitionName) {
+        Literal.create(null, NullType)
+      } else {
+        Literal.create(unescapePathName(raw), StringType)
+      }
+    }
+  }
+  private val upCastingOrder: Seq[DataType] =
+    Seq(NullType, IntegerType, LongType, FloatType, DoubleType, DecimalType.Unlimited, StringType)
+  /**
+   * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
+   * types.
+   */
+  private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
+    val desiredType = {
+      val topType =
+      // Falls back to string if all values of this column are null or empty string
+      if (topType == NullType) StringType else topType
+    }
+ { case l @ Literal(_, dataType) =>
+      Literal.create(Cast(l, desiredType).eval(), desiredType)
+    }
+  }
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+  // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+  val charToEscape = {
+    val bitSet = new java.util.BitSet(128)
+    /**
+     * ASCII 01-1F are HTTP control characters that need to be escaped.
+     * \u000A and \u000D are \n and \r, respectively.
+     */
+    val clist = Array(
+      '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
+      '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
+      '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
+      '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
+      '{', '[', ']', '^')
+    clist.foreach(bitSet.set(_))
+    if (Shell.WINDOWS) {
+      Array(' ', '<', '>', '|').foreach(bitSet.set(_))
+    }
+    bitSet
+  }
+  def needsEscaping(c: Char): Boolean = {
+    c >= 0 && c < charToEscape.size() && charToEscape.get(c)
+  }
+  def escapePathName(path: String): String = {
+    val builder = new StringBuilder()
+    path.foreach { c =>
+      if (needsEscaping(c)) {
+        builder.append('%')
+        builder.append(f"${c.asInstanceOf[Int]}%02x")
+      } else {
+        builder.append(c)
+      }
+    }
+    builder.toString()
+  }
+  def unescapePathName(path: String): String = {
+    val sb = new StringBuilder
+    var i = 0
+    while (i < path.length) {
+      val c = path.charAt(i)
+      if (c == '%' && i + 2 < path.length) {
+        val code: Int = try {
+          Integer.valueOf(path.substring(i + 1, i + 3), 16)
+        } catch { case e: Exception =>
+          -1: Integer
+        }
+        if (code >= 0) {
+          sb.append(code.asInstanceOf[Char])
+          i += 3
+        } else {
+          sb.append(c)
+          i += 1
+        }
+      } else {
+        sb.append(c)
+        i += 1
+      }
+    }
+    sb.toString()
+  }
+package org.apache.spark.sql.execution.datasources
+import java.util.{Date, UUID}
+import scala.collection.JavaConversions.asScalaIterator
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
+import org.apache.spark._
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.util.SerializableConfiguration
+private[sql] case class InsertIntoDataSource(
+    logicalRelation: LogicalRelation,
+    query: LogicalPlan,
+    overwrite: Boolean)
+  extends RunnableCommand {
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
+    val data = DataFrame(sqlContext, query)
+    // Apply the schema of the existing table to the new data.
+    val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
+    relation.insert(df, overwrite)
+    // Invalidate the cache.
+    sqlContext.cacheManager.invalidateCache(logicalRelation)
+    Seq.empty[Row]
+  }
+ * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
+ * Writing to dynamic partitions is also supported.  Each [[InsertIntoHadoopFsRelation]] issues a
+ * single write job, and owns a UUID that identifies this job.  Each concrete implementation of
+ * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
+ * each task output file.  This UUID is passed to executor side via a property named
+ * `spark.sql.sources.writeJobUUID`.
+ *
+ * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
+ * are used to write to normal tables and tables with dynamic partitions.
+ *
+ * Basic work flow of this command is:
+ *
+ *   1. Driver side setup, including output committer initialization and data source specific
+ *      preparation work for the write job to be issued.
+ *   2. Issues a write job consists of one or more executor side tasks, each of which writes all
+ *      rows within an RDD partition.
+ *   3. If no exception is thrown in a task, commits that task, otherwise aborts that task;  If any
+ *      exception is thrown during task commitment, also aborts that task.
+ *   4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception is
+ *      thrown during job commitment, also aborts the job.
+ */
+private[sql] case class InsertIntoHadoopFsRelation(
+    @transient relation: HadoopFsRelation,
+    @transient query: LogicalPlan,
+    mode: SaveMode)
+  extends RunnableCommand {
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    require(
+      relation.paths.length == 1,
+      s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
+    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+    val outputPath = new Path(relation.paths.head)
+    val fs = outputPath.getFileSystem(hadoopConf)
+    val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+    val pathExists = fs.exists(qualifiedOutputPath)
+    val doInsertion = (mode, pathExists) match {
+      case (SaveMode.ErrorIfExists, true) =>
+        sys.error(s"path $qualifiedOutputPath already exists.")
+      case (SaveMode.Overwrite, true) =>
+        fs.delete(qualifiedOutputPath, true)
+        true
+      case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
+        true
+      case (SaveMode.Ignore, exists) =>
+        !exists
+    }
+    // If we are appending data to an existing dir.
+    val isAppend = pathExists && (mode == SaveMode.Append)
+    if (doInsertion) {
+      val job = new Job(hadoopConf)
+      job.setOutputKeyClass(classOf[Void])
+      job.setOutputValueClass(classOf[InternalRow])
+      FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
+      // We create a DataFrame by applying the schema of relation to the data to make sure.
+      // We are writing data based on the expected schema,
+      val df = {
+        // For partitioned relation r, r.schema's column ordering can be different from the column
+        // ordering of data.logicalPlan (partition columns are all moved after data column). We
+        // need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
+        // safely apply the schema of r.schema to the data.
+        val project = Project(
+ => new UnresolvedAttribute(Seq(, query)
+        sqlContext.internalCreateDataFrame(
+          DataFrame(sqlContext, project).queryExecution.toRdd, relation.schema)
+      }
+      val partitionColumns = relation.partitionColumns.fieldNames
+      if (partitionColumns.isEmpty) {
+        insert(new DefaultWriterContainer(relation, job, isAppend), df)
+      } else {
+        val writerContainer = new DynamicPartitionWriterContainer(
+          relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
+        insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
+      }
+    }
+    Seq.empty[Row]
+  }
+  /**
+   * Inserts the content of the [[DataFrame]] into a table without any partitioning columns.
+   */
+  private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = {
+    // Uses local vals for serialization
+    val needsConversion = relation.needConversion
+    val dataSchema = relation.dataSchema
+    // This call shouldn't be put into the `try` block below because it only initializes and
+    // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
+    writerContainer.driverSideSetup()
+    try {
+      df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
+      writerContainer.commitJob()
+      relation.refresh()
+    } catch { case cause: Throwable =>
+      logError("Aborting job.", cause)
+      writerContainer.abortJob()
+      throw new SparkException("Job aborted.", cause)
+    }
+    def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
+      // If anything below fails, we should abort the task.
+      try {
+        writerContainer.executorSideSetup(taskContext)
+        val converter: InternalRow => Row = if (needsConversion) {
+          CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
+        } else {
+          r: InternalRow => r.asInstanceOf[Row]
+        }
+        while (iterator.hasNext) {
+          val internalRow =
+          writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
+        }
+        writerContainer.commitTask()
+      } catch { case cause: Throwable =>
+        logError("Aborting task.", cause)
+        writerContainer.abortTask()
+        throw new SparkException("Task failed while writing rows.", cause)
+      }
+    }
+  }
+  /**
+   * Inserts the content of the [[DataFrame]] into a table with partitioning columns.
+   */
+  private def insertWithDynamicPartitions(
+      sqlContext: SQLContext,
+      writerContainer: BaseWriterContainer,
+      df: DataFrame,
+      partitionColumns: Array[String]): Unit = {
+    // Uses a local val for serialization
+    val needsConversion = relation.needConversion
+    val dataSchema = relation.dataSchema
+    require(
+      df.schema == relation.schema,
+      s"""DataFrame must have the same schema as the relation to which is inserted.
+         |DataFrame schema: ${df.schema}
+         |Relation schema: ${relation.schema}
+       """.stripMargin)
+    val partitionColumnsInSpec = relation.partitionColumns.fieldNames
+    require(
+      partitionColumnsInSpec.sameElements(partitionColumns),
+      s"""Partition columns mismatch.
+         |Expected: ${partitionColumnsInSpec.mkString(", ")}
+         |Actual: ${partitionColumns.mkString(", ")}
+       """.stripMargin)
+    val output = df.queryExecution.executedPlan.output
+    val (partitionOutput, dataOutput) = output.partition(a => partitionColumns.contains(
+    val codegenEnabled = df.sqlContext.conf.codegenEnabled
+    // This call shouldn't be put into the `try` block below because it only initializes and
+    // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
+    writerContainer.driverSideSetup()
+    try {
+      df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
+      writerContainer.commitJob()
+      relation.refresh()
+    } catch { case cause: Throwable =>
+      logError("Aborting job.", cause)
+      writerContainer.abortJob()
+      throw new SparkException("Job aborted.", cause)
+    }
+    def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
+      // If anything below fails, we should abort the task.
+      try {
+        writerContainer.executorSideSetup(taskContext)
+        // Projects all partition columns and casts them to strings to build partition directories.
+        val partitionCasts =, StringType))
+        val partitionProj = newProjection(codegenEnabled, partitionCasts, output)
+        val dataProj = newProjection(codegenEnabled, dataOutput, output)
+        val dataConverter: InternalRow => Row = if (needsConversion) {
+          CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
+        } else {
+          r: InternalRow => r.asInstanceOf[Row]
+        }
+        while (iterator.hasNext) {
+          val internalRow =
+          val partitionPart = partitionProj(internalRow)
+          val dataPart = dataConverter(dataProj(internalRow))
+          writerContainer.outputWriterForRow(partitionPart).write(dataPart)
+        }
+        writerContainer.commitTask()
+      } catch { case cause: Throwable =>
+        logError("Aborting task.", cause)
+        writerContainer.abortTask()
+        throw new SparkException("Task failed while writing rows.", cause)
+      }
+    }
+  }
+  // This is copied from SparkPlan, probably should move this to a more general place.
+  private def newProjection(
+      codegenEnabled: Boolean,
+      expressions: Seq[Expression],
+      inputSchema: Seq[Attribute]): Projection = {
+    log.debug(
+      s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
+    if (codegenEnabled) {
+      try {
+        GenerateProjection.generate(expressions, inputSchema)
+      } catch {
+        case e: Exception =>
+          if (sys.props.contains("spark.testing")) {
+            throw e
+          } else {
+            log.error("failed to generate projection, fallback to interpreted", e)
+            new InterpretedProjection(expressions, inputSchema)
+          }
+      }
+    } else {
+      new InterpretedProjection(expressions, inputSchema)
+    }
+  }
+private[sql] abstract class BaseWriterContainer(
+    @transient val relation: HadoopFsRelation,
+    @transient job: Job,
+    isAppend: Boolean)
+  extends SparkHadoopMapReduceUtil
+  with Logging
+  with Serializable {
+  protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
+  // This UUID is used to avoid output file name collision between different appending write jobs.
+  // These jobs may belong to different SparkContext instances. Concrete data source implementations
+  // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
+  //  The reason why this ID is used to identify a job rather than a single task output file is
+  // that, speculative tasks must generate the same output file name as the original task.
+  private val uniqueWriteJobId = UUID.randomUUID()
+  // This is only used on driver side.
+  @transient private val jobContext: JobContext = job
+  // The following fields are initialized and used on both driver and executor side.
+  @transient protected var outputCommitter: OutputCommitter = _
+  @transient private var jobId: JobID = _
+  @transient private var taskId: TaskID = _
+  @transient private var taskAttemptId: TaskAttemptID = _
+  @transient protected var taskAttemptContext: TaskAttemptContext = _
+  protected val outputPath: String = {
+    assert(
+      relation.paths.length == 1,
+      s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
+    relation.paths.head
+  }
+  protected val dataSchema = relation.dataSchema
+  protected var outputWriterFactory: OutputWriterFactory = _
+  private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
+  def driverSideSetup(): Unit = {
+    setupIDs(0, 0, 0)
+    setupConf()
+    // This UUID is sent to executor side together with the serialized `Configuration` object within
+    // the `Job` instance.  `OutputWriters` on the executor side should use this UUID to generate
+    // unique task output files.
+    job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+    // Order of the following two lines is important.  For Hadoop 1, TaskAttemptContext constructor
+    // clones the Configuration object passed in.  If we initialize the TaskAttemptContext first,
+    // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
+    //
+    // Also, the `prepareJobForWrite` call must happen before initializing output format and output
+    // committer, since their initialization involve the job configuration, which can be potentially
+    // decorated in `prepareJobForWrite`.
+    outputWriterFactory = relation.prepareJobForWrite(job)
+    taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
+    outputFormatClass = job.getOutputFormatClass
+    outputCommitter = newOutputCommitter(taskAttemptContext)
+    outputCommitter.setupJob(jobContext)
+  }
+  def executorSideSetup(taskContext: TaskContext): Unit = {
+    setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
+    setupConf()
+    taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
+    outputCommitter = newOutputCommitter(taskAttemptContext)
+    outputCommitter.setupTask(taskAttemptContext)
+    initWriters()
+  }
+  protected def getWorkPath: String = {
+    outputCommitter match {
+      // FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
+      case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
+      case _ => outputPath
+    }
+  }
+  private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
+    val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+    if (isAppend) {
+      // If we are appending data to an existing dir, we will only use the output committer
+      // associated with the file output format since it is not safe to use a custom
+      // committer for appending. For example, in S3, direct parquet output committer may
+      // leave partial data in the destination dir when the the appending job fails.
+      logInfo(
+        s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
+        "for appending.")
+      defaultOutputCommitter
+    } else {
+      val committerClass = context.getConfiguration.getClass(
+        SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+      Option(committerClass).map { clazz =>
+        logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+        // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+        // has an associated output committer. To override this output committer,
+        // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+        // If a data source needs to override the output committer, it needs to set the
+        // output committer in prepareForWrite method.
+        if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
+          // The specified output committer is a FileOutputCommitter.
+          // So, we will use the FileOutputCommitter-specified constructor.
+          val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+          ctor.newInstance(new Path(outputPath), context)
+        } else {
+          // The specified output committer is just a OutputCommitter.
+          // So, we will use the no-argument constructor.
+          val ctor = clazz.getDeclaredConstructor()
+          ctor.newInstance()
+        }
+      }.getOrElse {
+        // If output committer class is not set, we will use the one associated with the
+        // file output format.
+        logInfo(
+          s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
+        defaultOutputCommitter
+      }
+    }
+  }
+  private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
+    this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
+    this.taskId = new TaskID(this.jobId, true, splitId)
+    this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
+  }
+  private def setupConf(): Unit = {
+    serializableConf.value.set("", jobId.toString)
+    serializableConf.value.set("", taskAttemptId.getTaskID.toString)
+    serializableConf.value.set("", taskAttemptId.toString)
+    serializableConf.value.setBoolean("", true)
+    serializableConf.value.setInt("mapred.task.partition", 0)
+  }
+  // Called on executor side when writing rows
+  def outputWriterForRow(row: InternalRow): OutputWriter
+  protected def initWriters(): Unit
+  def commitTask(): Unit = {
+    SparkHadoopMapRedUtil.commitTask(
+      outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
+  }
+  def abortTask(): Unit = {
+    if (outputCommitter != null) {
+      outputCommitter.abortTask(taskAttemptContext)
+    }
+    logError(s"Task attempt $taskAttemptId aborted.")
+  }
+  def commitJob(): Unit = {
+    outputCommitter.commitJob(jobContext)
+    logInfo(s"Job $jobId committed.")
+  }
+  def abortJob(): Unit = {
+    if (outputCommitter != null) {
+      outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
+    }
+    logError(s"Job $jobId aborted.")
+  }
+private[sql] class DefaultWriterContainer(
+    @transient relation: HadoopFsRelation,
+    @transient job: Job,
+    isAppend: Boolean)
+  extends BaseWriterContainer(relation, job, isAppend) {
+  @transient private var writer: OutputWriter = _
+  override protected def initWriters(): Unit = {
+    taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
+    writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
+  }
+  override def outputWriterForRow(row: InternalRow): OutputWriter = writer
+  override def commitTask(): Unit = {
+    try {
+      assert(writer != null, "OutputWriter instance should have been initialized")
+      writer.close()
+      super.commitTask()
+    } catch { case cause: Throwable =>
+      // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
+      // cause `abortTask()` to be invoked.
+      throw new RuntimeException("Failed to commit task", cause)
+    }
+  }
+  override def abortTask(): Unit = {
+    try {
+      // It's possible that the task fails before `writer` gets initialized
+      if (writer != null) {
+        writer.close()
+      }
+    } finally {
+      super.abortTask()
+    }
+  }
+private[sql] class DynamicPartitionWriterContainer(
+    @transient relation: HadoopFsRelation,
+    @transient job: Job,
+    partitionColumns: Array[String],
+    defaultPartitionName: String,
+    isAppend: Boolean)
+  extends BaseWriterContainer(relation, job, isAppend) {
+  // All output writers are created on executor side.
+  @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _
+  override protected def initWriters(): Unit = {
+    outputWriters = new java.util.HashMap[String, OutputWriter]
+  }
+  // The `row` argument is supposed to only contain partition column values which have been casted
+  // to strings.
+  override def outputWriterForRow(row: InternalRow): OutputWriter = {
+    val partitionPath = {
+      val partitionPathBuilder = new StringBuilder
+      var i = 0
+      while (i < partitionColumns.length) {
+        val col = partitionColumns(i)
+        val partitionValueString = {
+          val string = row.getString(i)
+          if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string)
+        }
+        if (i > 0) {
+          partitionPathBuilder.append(Path.SEPARATOR_CHAR)
+        }
+        partitionPathBuilder.append(s"$col=$partitionValueString")
+        i += 1
+      }
+      partitionPathBuilder.toString()
+    }
+    val writer = outputWriters.get(partitionPath)
+    if (writer.eq(null)) {
+      val path = new Path(getWorkPath, partitionPath)
+      taskAttemptContext.getConfiguration.set(
+        "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
+      val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
+      outputWriters.put(partitionPath, newWriter)
+      newWriter
+    } else {
+      writer
+    }
+  }
+  private def clearOutputWriters(): Unit = {
+    if (!outputWriters.isEmpty) {
+      asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
+      outputWriters.clear()
+    }
+  }
+  override def commitTask(): Unit = {
+    try {
+      clearOutputWriters()
+      super.commitTask()
+    } catch { case cause: Throwable =>
+      throw new RuntimeException("Failed to commit task", cause)
+    }
+  }
+  override def abortTask(): Unit = {
+    try {
+      clearOutputWriters()
+    } finally {
+      super.abortTask()
+    }
+  }

+package org.apache.spark.sql.execution.datasources
+import scala.language.{existentials, implicitConversions}
+import scala.util.matching.Regex
+import org.apache.hadoop.fs.Path
+import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SaveMode}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, InternalRow}
+import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+ * A parser for foreign DDL commands.
+ */
+private[sql] class DDLParser(
+    parseQuery: String => LogicalPlan)
+  extends AbstractSparkSQLParser with DataTypeParser with Logging {
+  def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
+    try {
+      parse(input)
+    } catch {
+      case ddlException: DDLException => throw ddlException
+      case _ if !exceptionOnError => parseQuery(input)
+      case x: Throwable => throw x
+    }
+  }
+  // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
+  // properties via reflection the class in runtime for constructing the SqlLexical object
+  protected val CREATE = Keyword("CREATE")
+  protected val TEMPORARY = Keyword("TEMPORARY")
+  protected val TABLE = Keyword("TABLE")
+  protected val IF = Keyword("IF")
+  protected val NOT = Keyword("NOT")
+  protected val EXISTS = Keyword("EXISTS")
+  protected val USING = Keyword("USING")
+  protected val OPTIONS = Keyword("OPTIONS")
+  protected val DESCRIBE = Keyword("DESCRIBE")
+  protected val EXTENDED = Keyword("EXTENDED")
+  protected val AS = Keyword("AS")
+  protected val COMMENT = Keyword("COMMENT")
+  protected val REFRESH = Keyword("REFRESH")
+  protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable
+  protected def start: Parser[LogicalPlan] = ddl
+  /**
+   * USING org.apache.spark.sql.avro
+   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+   * or
+   * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
+   * USING org.apache.spark.sql.avro
+   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+   * or
+   * USING org.apache.spark.sql.avro
+   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+   * AS SELECT ...
+   */
+  protected lazy val createTable: Parser[LogicalPlan] =
+    // TODO: Support database.table.
+    (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
+      tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
+      case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
+        if (temp.isDefined && allowExisting.isDefined) {
+          throw new DDLException(
+            "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
+        }
+        val options = opts.getOrElse(Map.empty[String, String])
+        if (query.isDefined) {
+          if (columns.isDefined) {
+            throw new DDLException(
+              "a CREATE TABLE AS SELECT statement does not allow column definitions.")
+          }
+          // When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
+          val mode = if (allowExisting.isDefined) {
+            SaveMode.Ignore
+          } else if (temp.isDefined) {
+            SaveMode.Overwrite
+          } else {
+            SaveMode.ErrorIfExists
+          }
+          val queryPlan = parseQuery(query.get)
+          CreateTableUsingAsSelect(tableName,
+            provider,
+            temp.isDefined,
+            Array.empty[String],
+            mode,
+            options,
+            queryPlan)
+        } else {
+          val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
+          CreateTableUsing(
+            tableName,
+            userSpecifiedSchema,
+            provider,
+            temp.isDefined,
+            options,
+            allowExisting.isDefined,
+            managedIfNoPath = false)
+        }
+    }
+  protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
+  /*
+   * describe [extended] table avroTable
+   * This will display all columns of table `avroTable` includes column_name,column_type,comment
+   */
+  protected lazy val describeTable: Parser[LogicalPlan] =
+    (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident  ^^ {
+      case e ~ db ~ tbl =>
+        val tblIdentifier = db match {
+          case Some(dbName) =>
+            Seq(dbName, tbl)
+          case None =>
+            Seq(tbl)
+        }
+        DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
+   }
+  protected lazy val refreshTable: Parser[LogicalPlan] =
+    REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
+      case maybeDatabaseName ~ tableName =>
+        RefreshTable(maybeDatabaseName.getOrElse("default"), tableName)
+    }
+  protected lazy val options: Parser[Map[String, String]] =
+    "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
+  protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}
+  override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
+    s"identifier matching regex $regex", {
+      case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
+      case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
+    }
+  )
+  protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ {
+    case name => name
+  }
+  protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
+    case parts => parts.mkString(".")
+  }
+  protected lazy val pair: Parser[(String, String)] =
+    optionName ~ stringLit ^^ { case k ~ v => (k, v) }
+  protected lazy val column: Parser[StructField] =
+    ident ~ dataType ~ (COMMENT ~> stringLit).?  ^^ { case columnName ~ typ ~ cm =>
+      val meta = cm match {
+        case Some(comment) =>
+          new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
+        case None => Metadata.empty
+      }
+      StructField(columnName, typ, nullable = true, meta)
+    }
+private[sql] object ResolvedDataSource {
+  private val builtinSources = Map(
+    "jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource",
+    "json" -> "org.apache.spark.sql.json.DefaultSource",
+    "parquet" -> "org.apache.spark.sql.parquet.DefaultSource",
+    "orc" -> "org.apache.spark.sql.hive.orc.DefaultSource"
+  )
+  /** Given a provider name, look up the data source class definition. */
+  def lookupDataSource(provider: String): Class[_] = {
+    val loader = Utils.getContextOrSparkClassLoader
+    if (builtinSources.contains(provider)) {
+      return loader.loadClass(builtinSources(provider))
+    }
+    try {
+      loader.loadClass(provider)
+    } catch {
+      case cnf: java.lang.ClassNotFoundException =>
+        try {
+          loader.loadClass(provider + ".DefaultSource")
+        } catch {
+          case cnf: java.lang.ClassNotFoundException =>
+            if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
+              sys.error("The ORC data source must be used with Hive support enabled.")
+            } else {
+              sys.error(s"Failed to load class for data source: $provider")
+            }
+        }
+    }
+  }
+  /** Create a [[ResolvedDataSource]] for reading data in. */
+  def apply(
+      sqlContext: SQLContext,
+      userSpecifiedSchema: Option[StructType],
+      partitionColumns: Array[String],
+      provider: String,
+      options: Map[String, String]): ResolvedDataSource = {
+    val clazz: Class[_] = lookupDataSource(provider)
+    def className: String = clazz.getCanonicalName
+    val relation = userSpecifiedSchema match {
+      case Some(schema: StructType) => clazz.newInstance() match {
+        case dataSource: SchemaRelationProvider =>
+          dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
+        case dataSource: HadoopFsRelationProvider =>
+          val maybePartitionsSchema = if (partitionColumns.isEmpty) {
+            None
+          } else {
+            Some(partitionColumnsSchema(schema, partitionColumns))
+          }
+          val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+          val paths = {
+            val patternPath = new Path(caseInsensitiveOptions("path"))
+            val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+            val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+            SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
+          }
+          val dataSchema =
+            StructType(schema.filterNot(f => partitionColumns.contains(
+          dataSource.createRelation(
+            sqlContext,
+            paths,
+            Some(dataSchema),
+            maybePartitionsSchema,
+            caseInsensitiveOptions)
+        case dataSource: org.apache.spark.sql.sources.RelationProvider =>
+          throw new AnalysisException(s"$className does not allow user-specified schemas.")
+        case _ =>
+          throw new AnalysisException(s"$className is not a RelationProvider.")
+      }
+      case None => clazz.newInstance() match {
+        case dataSource: RelationProvider =>
+          dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
+        case dataSource: HadoopFsRelationProvider =>
+          val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+          val paths = {
+            val patternPath = new Path(caseInsensitiveOptions("path"))
+            val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+            val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+            SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
+          }
+          dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
+        case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
+          throw new AnalysisException(
+            s"A schema needs to be specified when using $className.")
+        case _ =>
+          throw new AnalysisException(
+            s"$className is neither a RelationProvider nor a FSBasedRelationProvider.")
+      }
+    }
+    new ResolvedDataSource(clazz, relation)
+  }
+  private def partitionColumnsSchema(
+      schema: StructType,
+      partitionColumns: Array[String]): StructType = {
+    StructType( { col =>
+      schema.find( == col).getOrElse {
+        throw new RuntimeException(s"Partition column $col not found in schema $schema")
+      }
+    }).asNullable
+  }
+  /** Create a [[ResolvedDataSource]] for saving the content of the given [[DataFrame]]. */
+  def apply(
+      sqlContext: SQLContext,
+      provider: String,
+      partitionColumns: Array[String],
+      mode: SaveMode,
+      options: Map[String, String],
+      data: DataFrame): ResolvedDataSource = {
+    if ([IntervalType])) {
+      throw new AnalysisException("Cannot save interval data type into external storage.")
+    }
+    val clazz: Class[_] = lookupDataSource(provider)
+    val relation = clazz.newInstance() match {
+      case dataSource: CreatableRelationProvider =>
+        dataSource.createRelation(sqlContext, mode, options, data)
+      case dataSource: HadoopFsRelationProvider =>
+        // Don't glob path for the write path.  The contracts here are:
+        //  1. Only one output path can be specified on the write path;
+        //  2. Output path must be a legal HDFS style file system path;
+        //  3. It's OK that the output path doesn't exist yet;
+        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+        val outputPath = {
+          val path = new Path(caseInsensitiveOptions("path"))
+          val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+          path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+        }
+        val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(
+        val r = dataSource.createRelation(
+          sqlContext,
+          Array(outputPath.toString),
+          Some(dataSchema.asNullable),
+          Some(partitionColumnsSchema(data.schema, partitionColumns)),
+          caseInsensitiveOptions)
+        // For partitioned relation r, r.schema's column ordering can be different from the column
+        // ordering of data.logicalPlan (partition columns are all moved after data column).  This
+        // will be adjusted within InsertIntoHadoopFsRelation.
+        sqlContext.executePlan(
+          InsertIntoHadoopFsRelation(
+            r,
+            data.logicalPlan,
+            mode)).toRdd
+        r
+      case _ =>
+        sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
+    }
+    new ResolvedDataSource(clazz, relation)
+  }
+private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
+ * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
+ * @param table The table to be described.
+ * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
+ *                   It is effective only when the table is a Hive table.
+ */
+private[sql] case class DescribeCommand(
+    table: LogicalPlan,
+    isExtended: Boolean) extends LogicalPlan with Command {
+  override def children: Seq[LogicalPlan] = Seq.empty
+  override val output: Seq[Attribute] = Seq(
+    // Column names are based on Hive.
+    AttributeReference("col_name", StringType, nullable = false,
+      new MetadataBuilder().putString("comment", "name of the column").build())(),
+    AttributeReference("data_type", StringType, nullable = false,
+      new MetadataBuilder().putString("comment", "data type of the column").build())(),
+    AttributeReference("comment", StringType, nullable = false,
+      new MetadataBuilder().putString("comment", "comment of the column").build())())
+  * Used to represent the operation of create table using a data source.
+  * @param allowExisting If it is true, we will do nothing when the table already exists.
+  *                      If it is false, an exception will be thrown
+  */
+private[sql] case class CreateTableUsing(
+    tableName: String,
+    userSpecifiedSchema: Option[StructType],
+    provider: String,
+    temporary: Boolean,
+    options: Map[String, String],
+    allowExisting: Boolean,
+    managedIfNoPath: Boolean) extends LogicalPlan with Command {
+  override def output: Seq[Attribute] = Seq.empty
+  override def children: Seq[LogicalPlan] = Seq.empty
+ * A node used to support CTAS statements and saveAsTable for the data source API.
+ * This node is a [[UnaryNode]] instead of a [[Command]] because we want the analyzer
+ * can analyze the logical plan that will be used to populate the table.
+ * So, [[PreWriteCheck]] can detect cases that are not allowed.
+ */
+private[sql] case class CreateTableUsingAsSelect(
+    tableName: String,
+    provider: String,
+    temporary: Boolean,
+    partitionColumns: Array[String],
+    mode: SaveMode,
+    options: Map[String, String],
+    child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = Seq.empty[Attribute]
+  // TODO: Override resolved after we support databaseName.
+  // override lazy val resolved = databaseName != None && childrenResolved
+private[sql] case class CreateTempTableUsing(
+    tableName: String,
+    userSpecifiedSchema: Option[StructType],
+    provider: String,
+    options: Map[String, String]) extends RunnableCommand {
+  def run(sqlContext: SQLContext): Seq[InternalRow] = {
+    val resolved = ResolvedDataSource(
+      sqlContext, userSpecifiedSchema, Array.empty[String], provider, options)
+    sqlContext.registerDataFrameAsTable(
+      DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
+    Seq.empty
+  }
+private[sql] case class CreateTempTableUsingAsSelect(
+    tableName: String,
+    provider: String,
+    partitionColumns: Array[String],
+    mode: SaveMode,
+    options: Map[String, String],
+    query: LogicalPlan) extends RunnableCommand {
+  override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+    val df = DataFrame(sqlContext, query)
+    val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df)
+    sqlContext.registerDataFrameAsTable(
+      DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
+    Seq.empty
+  }
+private[sql] case class RefreshTable(databaseName: String, tableName: String)
+  extends RunnableCommand {
+  override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+    // Refresh the given table's metadata first.
+    sqlContext.catalog.refreshTable(databaseName, tableName)
+    // If this table is cached as a InMemoryColumnarRelation, drop the original
+    // cached version and make the new version cached lazily.
+    val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
+    // Use lookupCachedData directly since RefreshTable also takes databaseName.
+    val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+    if (isCached) {
+      // Create a data frame to represent the table.
+      // TODO: Use uncacheTable once it supports database name.
+      val df = DataFrame(sqlContext, logicalPlan)
+      // Uncache the logicalPlan.
+      sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
+      // Cache it again.
+      sqlContext.cacheManager.cacheQuery(df, Some(tableName))
+    }
+    Seq.empty[InternalRow]
+  }
+ * Builds a map in which keys are case insensitive
+ */
+protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
+  with Serializable {
+  val baseMap = => kv.copy(_1 = kv._1.toLowerCase))
+  override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
+  override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
+    baseMap + kv.copy(_1 = kv._1.toLowerCase)
+  override def iterator: Iterator[(String, String)] = baseMap.iterator
+  override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
+ * The exception thrown from the DDL parser.
+ */
+protected[sql] class DDLException(message: String) extends Exception(message)
+package org.apache.spark.sql.execution.datasources
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation}
+ * A rule to do pre-insert data type casting and field renaming. Before we insert into
+ * an [[InsertableRelation]], we will use this rule to make sure that
+ * the columns to be inserted have the correct data type and fields have the correct names.
+ */
+private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      // Wait until children are resolved.
+      case p: LogicalPlan if !p.childrenResolved => p
+      // We are inserting into an InsertableRelation or HadoopFsRelation.
+      case i @ InsertIntoTable(
+      l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => {
+        // First, make sure the data to be inserted have the same number of fields with the
+        // schema of the relation.
+        if (l.output.size != child.output.size) {
+          sys.error(
+            s"$l requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE " +
+              s"statement generates the same number of columns as its schema.")
+        }
+        castAndRenameChildOutput(i, l.output, child)
+      }
+  }
+  /** If necessary, cast data types and rename fields to the expected types and names. */
+  def castAndRenameChildOutput(
+      insertInto: InsertIntoTable,
+      expectedOutput: Seq[Attribute],
+      child: LogicalPlan): InsertIntoTable = {
+    val newChildOutput = {
+      case (expected, actual) =>
+        val needCast = !expected.dataType.sameType(actual.dataType)
+        // We want to make sure the filed names in the data to be inserted exactly match
+        // names in the schema.
+        val needRename = !=
+        (needCast, needRename) match {
+          case (true, _) => Alias(Cast(actual, expected.dataType),
+          case (false, true) => Alias(actual,
+          case (_, _) => actual
+        }
+    }
+    if (newChildOutput == child.output) {
+      insertInto
+    } else {
+      insertInto.copy(child = Project(newChildOutput, child))
+    }
+  }
+ * A rule to do various checks before inserting into or writing to a data source table.
+ */
+private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) {
+  def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
+  def apply(plan: LogicalPlan): Unit = {
+    plan.foreach {
+      case i @ logical.InsertIntoTable(
+        l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) =>
+        // Right now, we do not support insert into a data source table with partition specs.
+        if (partition.nonEmpty) {
+          failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
+        } else {
+          // Get all input data source relations of the query.
+          val srcRelations = query.collect {
+            case LogicalRelation(src: BaseRelation) => src
+          }
+          if (srcRelations.contains(t)) {
+            failAnalysis(
+              "Cannot insert overwrite into table that is also being read from.")
+          } else {
+            // OK
+          }
+        }
+      case logical.InsertIntoTable(LogicalRelation(r: HadoopFsRelation), part, _, _, _) =>
+        // We need to make sure the partition columns specified by users do match partition
+        // columns of the relation.
+        val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
+        val specifiedPartitionColumns = part.keySet
+        if (existingPartitionColumns != specifiedPartitionColumns) {
+          failAnalysis(s"Specified partition columns " +
+            s"(${specifiedPartitionColumns.mkString(", ")}) " +
+            s"do not match the partition columns of the table. Please use " +
+            s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.")
+        } else {
+          // OK
+        }
+      case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
+        // The relation in l is not an InsertableRelation.
+        failAnalysis(s"$l does not allow insertion.")
+      case logical.InsertIntoTable(t, _, _, _, _) =>
+        if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) {
+          failAnalysis(s"Inserting into an RDD-based table is not allowed.")
+        } else {
+          // OK
+        }
+      case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
+        // When the SaveMode is Overwrite, we need to check if the table is an input table of
+        // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
+        if (catalog.tableExists(Seq(tableName))) {
+          // Need to remove SubQuery operator.
+          EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match {
+            // Only do the check if the table is a data source table
+            // (the relation is a BaseRelation).
+            case l @ LogicalRelation(dest: BaseRelation) =>
+              // Get all input data source relations of the query.
+              val srcRelations = query.collect {
+                case LogicalRelation(src: BaseRelation) => src
+              }
+              if (srcRelations.contains(dest)) {
+                failAnalysis(
+                  s"Cannot overwrite table $tableName that is also being read from.")
+              } else {
+                // OK
+              }
+            case _ => // OK
+          }
+        } else {
+          // OK
+        }
+      case _ => // OK
+    }
+  }
-package org.apache.spark.sql.sources
-import org.apache.spark.{Logging, TaskContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.types.{StringType, StructType}
-import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.unsafe.types.UTF8String
- * A Strategy for planning scans over data sources defined using the sources API.
- */
-private[sql] object DataSourceStrategy extends Strategy with Logging {
-  def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan)) =>
-      pruneFilterProjectRaw(
-        l,
-        projects,
-        filters,
-        (a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan)) =>
-      pruneFilterProject(
-        l,
-        projects,
-        filters,
-        (a, f) => toCatalystRDD(l, a, t.buildScan(, f))) :: Nil
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan)) =>
-      pruneFilterProject(
-        l,
-        projects,
-        filters,
-        (a, _) => toCatalystRDD(l, a, t.buildScan( :: Nil
-    // Scanning partitioned HadoopFsRelation
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation))
-        if t.partitionSpec.partitionColumns.nonEmpty =>
-      val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
-      logInfo {
-        val total = t.partitionSpec.partitions.length
-        val selected = selectedPartitions.length
-        val percentPruned = (1 - selected.toDouble / total.toDouble) * 100
-        s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
-      }
-      // Only pushes down predicates that do not reference partition columns.
-      val pushedFilters = {
-        val partitionColumnNames =
-        filters.filter { f =>
-          val referencedColumnNames =
-          referencedColumnNames.intersect(partitionColumnNames).isEmpty
-        }
-      }
-      buildPartitionedTableScan(
-        l,
-        projects,
-        pushedFilters,
-        t.partitionSpec.partitionColumns,
-        selectedPartitions) :: Nil
-    // Scanning non-partitioned HadoopFsRelation
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
-      // See buildPartitionedTableScan for the reason that we need to create a shard
-      // broadcast HadoopConf.
-      val sharedHadoopConf = SparkHadoopUtil.get.conf
-      val confBroadcast =
-        t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
-      pruneFilterProject(
-        l,
-        projects,
-        filters,
-        (a, f) =>
-          toCatalystRDD(l, a, t.buildScan(, f, t.paths, confBroadcast))) :: Nil
-    case l @ LogicalRelation(t: TableScan) =>
-      execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil
-    case i @ logical.InsertIntoTable(
-      l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty =>
-      execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
-    case i @ logical.InsertIntoTable(
-      l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) =>
-      val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
-      execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil
-    case _ => Nil
-  }
-  private def buildPartitionedTableScan(
-      logicalRelation: LogicalRelation,
-      projections: Seq[NamedExpression],
-      filters: Seq[Expression],
-      partitionColumns: StructType,
-      partitions: Array[Partition]) = {
-    val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
-    // Because we are creating one RDD per partition, we need to have a shared HadoopConf.
-    // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
-    val sharedHadoopConf = SparkHadoopUtil.get.conf
-    val confBroadcast =
-      relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
-    // Builds RDD[Row]s for each selected partition.
-    val perPartitionRows = { case Partition(partitionValues, dir) =>
-      // The table scan operator (PhysicalRDD) which retrieves required columns from data files.
-      // Notice that the schema of data files, represented by `relation.dataSchema`, may contain
-      // some partition column(s).
-      val scan =
-        pruneFilterProject(
-          logicalRelation,
-          projections,
-          filters,
-          (columns: Seq[Attribute], filters) => {
-            val partitionColNames = partitionColumns.fieldNames
-            // Don't scan any partition columns to save I/O.  Here we are being optimistic and
-            // assuming partition columns data stored in data files are always consistent with those
-            // partition values encoded in partition directory paths.
-            val needed = columns.filterNot(a => partitionColNames.contains(
-            val dataRows =
-              relation.buildScan(, filters, Array(dir), confBroadcast)
-            // Merges data values with partition values.
-            mergeWithPartitionValues(
-              relation.schema,
-    ,
-              partitionColNames,
-              partitionValues,
-              toCatalystRDD(logicalRelation, needed, dataRows))
-          })
-      scan.execute()
-    }
-    val unionedRows =
-      if (perPartitionRows.length == 0) {
-        relation.sqlContext.emptyResult
-      } else {
-        new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
-      }
-    execution.PhysicalRDD(, unionedRows)
-  }
-  private def mergeWithPartitionValues(
-      schema: StructType,
-      requiredColumns: Array[String],
-      partitionColumns: Array[String],
-      partitionValues: InternalRow,
-      dataRows: RDD[InternalRow]): RDD[InternalRow] = {
-    val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains)
-    // If output columns contain any partition column(s), we need to merge scanned data
-    // columns and requested partition columns to form the final result.
-    if (!requiredColumns.sameElements(nonPartitionColumns)) {
-      val mergers = { case (name, index) =>
-        // To see whether the `index`-th column is a partition column...
-        val i = partitionColumns.indexOf(name)
-        if (i != -1) {
-          // If yes, gets column value from partition values.
-          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
-            mutableRow(ordinal) = partitionValues(i)
-          }
-        } else {
-          // Otherwise, inherits the value from scanned data.
-          val i = nonPartitionColumns.indexOf(name)
-          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
-            mutableRow(ordinal) = dataRow(i)
-          }
-        }
-      }
-      // Since we know for sure that this closure is serializable, we can avoid the overhead
-      // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
-      // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
-      val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
-        val dataTypes =
-        val mutableRow = new SpecificMutableRow(dataTypes)
- { dataRow =>
-          var i = 0
-          while (i < mutableRow.length) {
-            mergers(i)(mutableRow, dataRow, i)
-            i += 1
-          }
-          mutableRow.asInstanceOf[InternalRow]
-        }
-      }
-      // This is an internal RDD whose call site the user should not be concerned with
-      // Since we create many of these (one per partition), the time spent on computing
-      // the call site may add up.
-      Utils.withDummyCallSite(dataRows.sparkContext) {
-        new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
-      }
-    } else {
-      dataRows
-    }
-  }
-  protected def prunePartitions(
-      predicates: Seq[Expression],
-      partitionSpec: PartitionSpec): Seq[Partition] = {
-    val PartitionSpec(partitionColumns, partitions) = partitionSpec
-    val partitionColumnNames =
-    val partitionPruningPredicates = predicates.filter {
-    }
-    if (partitionPruningPredicates.nonEmpty) {
-      val predicate =
-        partitionPruningPredicates
-          .reduceOption(expressions.And)
-          .getOrElse(Literal(true))
-      val boundPredicate = InterpretedPredicate.create(predicate.transform {
-        case a: AttributeReference =>
-          val index = partitionColumns.indexWhere( ==
-          BoundReference(index, partitionColumns(index).dataType, nullable = true)
-      })
-      partitions.filter { case Partition(values, _) => boundPredicate(values) }
-    } else {
-      partitions
-    }
-  }
-  // Based on Public API.
-  protected def pruneFilterProject(
-      relation: LogicalRelation,
-      projects: Seq[NamedExpression],
-      filterPredicates: Seq[Expression],
-      scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]) = {
-    pruneFilterProjectRaw(
-      relation,
-      projects,
-      filterPredicates,
-      (requestedColumns, pushedFilters) => {
-        scanBuilder(requestedColumns, selectFilters(pushedFilters).toArray)
-      })
-  }
-  // Based on Catalyst expressions.
-  protected def pruneFilterProjectRaw(
-      relation: LogicalRelation,
-      projects: Seq[NamedExpression],
-      filterPredicates: Seq[Expression],
-      scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[InternalRow]) = {
-    val projectSet = AttributeSet(projects.flatMap(_.references))
-    val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
-    val filterCondition = filterPredicates.reduceLeftOption(expressions.And)
-    val pushedFilters = { _ transform {
-      case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
-    }}
-    if ( == projects &&
-        projectSet.size == projects.size &&
-        filterSet.subsetOf(projectSet)) {
-      // When it is possible to just use column pruning to get the right projection and
-      // when the columns of this projection are enough to evaluate all filter conditions,
-      // just do a scan followed by a filter, with no extra project.
-      val requestedColumns =
-        projects.asInstanceOf[Seq[Attribute]] // Safe due to if above.
-          .map(relation.attributeMap)            // Match original case of attributes.
-      val scan = execution.PhysicalRDD(,
-        scanBuilder(requestedColumns, pushedFilters))
-, scan)).getOrElse(scan)
-    } else {
-      val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq
-      val scan = execution.PhysicalRDD(requestedColumns,
-        scanBuilder(requestedColumns, pushedFilters))
-      execution.Project(projects,, scan)).getOrElse(scan))
-    }
-  }
-  /**
-   * Convert RDD of Row into RDD of InternalRow with objects in catalyst types
-   */
-  private[this] def toCatalystRDD(
-      relation: LogicalRelation,
-      output: Seq[Attribute],
-      rdd: RDD[Row]): RDD[InternalRow] = {
-    if (relation.relation.needConversion) {
-      execution.RDDConversions.rowToRowRdd(rdd,
-    } else {
-    }
-  }
-  /**
-   * Convert RDD of Row into RDD of InternalRow with objects in catalyst types
-   */
-  private[this] def toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]): RDD[InternalRow] = {
-    toCatalystRDD(relation, relation.output, rdd)
-  }
-  /**
-   * Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s,
-   * and convert them.
-   */
-  protected[sql] def selectFilters(filters: Seq[Expression]) = {
-    def translate(predicate: Expression): Option[Filter] = predicate match {
-      case expressions.EqualTo(a: Attribute, Literal(v, _)) =>
-        Some(sources.EqualTo(, v))
-      case expressions.EqualTo(Literal(v, _), a: Attribute) =>
-        Some(sources.EqualTo(, v))
-      case expressions.GreaterThan(a: Attribute, Literal(v, _)) =>
-        Some(sources.GreaterThan(, v))
-      case expressions.GreaterThan(Literal(v, _), a: Attribute) =>
-        Some(sources.LessThan(, v))
-      case expressions.LessThan(a: Attribute, Literal(v, _)) =>
-        Some(sources.LessThan(, v))
-      case expressions.LessThan(Literal(v, _), a: Attribute) =>
-        Some(sources.GreaterThan(, v))
-      case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, _)) =>
-        Some(sources.GreaterThanOrEqual(, v))
-      case expressions.GreaterThanOrEqual(Literal(v, _), a: Attribute) =>
-        Some(sources.LessThanOrEqual(, v))
-      case expressions.LessThanOrEqual(a: Attribute, Literal(v, _)) =>
-        Some(sources.LessThanOrEqual(, v))
-      case expressions.LessThanOrEqual(Literal(v, _), a: Attribute) =>
-        Some(sources.GreaterThanOrEqual(, v))
-      case expressions.InSet(a: Attribute, set) =>
-        Some(sources.In(, set.toArray))
-      case expressions.IsNull(a: Attribute) =>
-        Some(sources.IsNull(
-      case expressions.IsNotNull(a: Attribute) =>
-        Some(sources.IsNotNull(
-      case expressions.And(left, right) =>
-        (translate(left) ++ translate(right)).reduceOption(sources.And)
-      case expressions.Or(left, right) =>
-        for {
-          leftFilter <- translate(left)
-          rightFilter <- translate(right)
-        } yield sources.Or(leftFilter, rightFilter)
-      case expressions.Not(child) =>
-        translate(child).map(sources.Not)
-      case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
-        Some(sources.StringStartsWith(, v.toString))
-      case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
-        Some(sources.StringEndsWith(, v.toString))
-      case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
-        Some(sources.StringContains(, v.toString))
-      case _ => None
-    }
-    filters.flatMap(translate)
-  }
-package org.apache.spark.sql.sources
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeMap}
-import org.apache.spark.sql.catalyst.plans.logical.{Statistics, LeafNode, LogicalPlan}
- * Used to link a [[BaseRelation]] in to a logical query plan.
- */
-private[sql] case class LogicalRelation(relation: BaseRelation)
-  extends LeafNode
-  with MultiInstanceRelation {
-  override val output: Seq[AttributeReference] = relation.schema.toAttributes
-  // Logical Relations are distinct if they have different output for the sake of transformations.
-  override def equals(other: Any): Boolean = other match {
-    case l @ LogicalRelation(otherRelation) => relation == otherRelation && output == l.output
-    case  _ => false
-  }
-  override def hashCode: Int = {
-, output)
-  }
-  override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
-    case LogicalRelation(otherRelation) => relation == otherRelation
-    case _ => false
-  }
-  @transient override lazy val statistics: Statistics = Statistics(
-    sizeInBytes = BigInt(relation.sizeInBytes)
-  )
-  /** Used to lookup original attribute capitalization */
-  val attributeMap: AttributeMap[AttributeReference] = AttributeMap( => (o, o)))
-  def newInstance(): this.type = LogicalRelation(relation).asInstanceOf[this.type]
-  override def simpleString: String = s"Relation[${output.mkString(",")}] $relation"
-package org.apache.spark.sql.sources
-import java.lang.{Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong}
-import java.math.{BigDecimal => JBigDecimal}
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.util.Shell
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
-import org.apache.spark.sql.types._
-private[sql] case class Partition(values: InternalRow, path: String)
-private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
-private[sql] object PartitionSpec {
-  val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
-private[sql] object PartitioningUtils {
-  // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
-  // depend on Hive.
-  private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
-    require(columnNames.size == literals.size)
-  }
-  /**
-   * Given a group of qualified paths, tries to parse them and returns a partition specification.
-   * For example, given:
-   * {{{
-   *   hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
-   *   hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
-   * }}}
-   * it returns:
-   * {{{
-   *   PartitionSpec(
-   *     partitionColumns = StructType(
-   *       StructField(name = "a", dataType = IntegerType, nullable = true),
-   *       StructField(name = "b", dataType = StringType, nullable = true),
-   *       StructField(name = "c", dataType = DoubleType, nullable = true)),
-   *     partitions = Seq(
-   *       Partition(
-   *         values = Row(1, "hello", 3.14),
-   *         path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
-   *       Partition(
-   *         values = Row(2, "world", 6.28),
-   *         path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
-   * }}}
-   */
-  private[sql] def parsePartitions(
-      paths: Seq[Path],
-      defaultPartitionName: String,
-      typeInference: Boolean): PartitionSpec = {
-    // First, we need to parse every partition's path and see if we can find partition values.
-    val pathsWithPartitionValues = paths.flatMap { path =>
-      parsePartition(path, defaultPartitionName, typeInference).map(path -> _)
-    }
-    if (pathsWithPartitionValues.isEmpty) {
-      // This dataset is not partitioned.
-      PartitionSpec.emptySpec
-    } else {
-      // This dataset is partitioned. We need to check whether all partitions have the same
-      // partition columns and resolve potential type conflicts.
-      val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
-      // Creates the StructType which represents the partition columns.
-      val fields = {
-        val PartitionValues(columnNames, literals) = resolvedPartitionValues.head
- { case (name, Literal(_, dataType)) =>
-          // We always assume partition columns are nullable since we've no idea whether null values
-          // will be appended in the future.
-          StructField(name, dataType, nullable = true)
-        }
-      }
-      // Finally, we create `Partition`s based on paths and resolved partition values.
-      val partitions = {
-        case (PartitionValues(_, literals), (path, _)) =>
-          Partition(InternalRow.fromSeq(, path.toString)
-      }
-      PartitionSpec(StructType(fields), partitions)
-    }
-  }
-  /**
-   * Parses a single partition, returns column names and values of each partition column.  For
-   * example, given:
-   * {{{
-   *   path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
-   * }}}
-   * it returns:
-   * {{{
-   *   PartitionValues(
-   *     Seq("a", "b", "c"),
-   *     Seq(
-   *       Literal.create(42, IntegerType),
-   *       Literal.create("hello", StringType),
-   *       Literal.create(3.14, FloatType)))
-   * }}}
-   */
-  private[sql] def parsePartition(
-      path: Path,
-      defaultPartitionName: String,
-      typeInference: Boolean): Option[PartitionValues] = {
-    val columns = ArrayBuffer.empty[(String, Literal)]
-    // Old Hadoop versions don't have `Path.isRoot`
-    var finished = path.getParent == null
-    var chopped = path
-    while (!finished) {
-      // Sometimes (e.g., when speculative task is enabled), temporary directories may be left
-      // uncleaned.  Here we simply ignore them.
-      if (chopped.getName.toLowerCase == "_temporary") {
-        return None
-      }
-      val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
-      maybeColumn.foreach(columns += _)
-      chopped = chopped.getParent
-      finished = maybeColumn.isEmpty || chopped.getParent == null
-    }
-    if (columns.isEmpty) {
-      None
-    } else {
-      val (columnNames, values) = columns.reverse.unzip
-      Some(PartitionValues(columnNames, values))
-    }
-  }
-  private def parsePartitionColumn(
-      columnSpec: String,
-      defaultPartitionName: String,
-      typeInference: Boolean): Option[(String, Literal)] = {
-    val equalSignIndex = columnSpec.indexOf('=')
-    if (equalSignIndex == -1) {
-      None
-    } else {
-      val columnName = columnSpec.take(equalSignIndex)
-      assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'")
-      val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
-      assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
-      val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference)
-      Some(columnName -> literal)
-    }
-  }
-  /**
-   * Resolves possible type conflicts between partitions by up-casting "lower" types.  The up-
-   * casting order is:
-   * {{{
-   *   NullType ->
-   *   IntegerType -> LongType ->
-   *   DoubleType -> DecimalType.Unlimited ->
-   *   StringType
-   * }}}
-   */
-  private[sql] def resolvePartitions(
-      pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
-    if (pathsWithPartitionValues.isEmpty) {
-      Seq.empty
-    } else {
-      val distinctPartColNames =
-      assert(
-        distinctPartColNames.size == 1,
-        listConflictingPartitionColumns(pathsWithPartitionValues))
-      // Resolves possible type conflicts for each column
-      val values =
-      val columnCount = values.head.columnNames.size
-      val resolvedValues = (0 until columnCount).map { i =>
-        resolveTypeConflicts(
-      }
-      // Fills resolved literals back to each partition
- { case (d, index) =>
-        d.copy(literals =
-      }
-    }
-  }
-  private[sql] def listConflictingPartitionColumns(
-      pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
-    val distinctPartColNames =
-    def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
-      seq.groupBy { case (key, _) => key }.mapValues( { case (_, value) => value })
-    val partColNamesToPaths = groupByKey( {
-      case (path, partValues) => partValues.columnNames -> path
-    })
-    val distinctPartColLists =", ")) {
-      case (names, index) =>
-        s"Partition column name list #$index: $names"
-    }
-    // Lists out those non-leaf partition directories that also contain files
-    val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
-    s"Conflicting partition column names detected:\n" +
-      distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
-      "For partitioned table directories, data files should only live in leaf directories.\n" +
-      "And directories at the same level should have the same partition column name.\n" +
-      "Please check the following directories for unexpected files or " +
-      "inconsistent partition column names:\n" +
-"\t" + _).mkString("\n", "\n", "")
-  }
-  /**
-   * Converts a string to a [[Literal]] with automatic type inference.  Currently only supports
-   * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and
-   * [[StringType]].
-   */
-  private[sql] def inferPartitionColumnValue(
-      raw: String,
-      defaultPartitionName: String,
-      typeInference: Boolean): Literal = {
-    if (typeInference) {
-      // First tries integral types
-      Try(Literal.create(Integer.parseInt(raw), IntegerType))
-        .orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
-        // Then falls back to fractional types
-        .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
-        .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited)))
-        // Then falls back to string
-        .getOrElse {
-          if (raw == defaultPartitionName) {
-            Literal.create(null, NullType)
-          } else {
-            Literal.create(unescapePathName(raw), StringType)
-          }
-        }
-    } else {
-      if (raw == defaultPartitionName) {
-        Literal.create(null, NullType)
-      } else {
-        Literal.create(unescapePathName(raw), StringType)
-      }
-    }
-  }
-  private val upCastingOrder: Seq[DataType] =
-    Seq(NullType, IntegerType, LongType, FloatType, DoubleType, DecimalType.Unlimited, StringType)
-  /**
-   * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
-   * types.
-   */
-  private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
-    val desiredType = {
-      val topType =
-      // Falls back to string if all values of this column are null or empty string
-      if (topType == NullType) StringType else topType
-    }
- { case l @ Literal(_, dataType) =>
-      Literal.create(Cast(l, desiredType).eval(), desiredType)
-    }
-  }
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-  // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-  val charToEscape = {
-    val bitSet = new java.util.BitSet(128)
-    /**
-     * ASCII 01-1F are HTTP control characters that need to be escaped.
-     * \u000A and \u000D are \n and \r, respectively.
-     */
-    val clist = Array(
-      '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
-      '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
-      '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
-      '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
-      '{', '[', ']', '^')
-    clist.foreach(bitSet.set(_))
-    if (Shell.WINDOWS) {
-      Array(' ', '<', '>', '|').foreach(bitSet.set(_))
-    }
-    bitSet
-  }
-  def needsEscaping(c: Char): Boolean = {
-    c >= 0 && c < charToEscape.size() && charToEscape.get(c)
-  }
-  def escapePathName(path: String): String = {
-    val builder = new StringBuilder()
-    path.foreach { c =>
-      if (needsEscaping(c)) {
-        builder.append('%')
-        builder.append(f"${c.asInstanceOf[Int]}%02x")
-      } else {
-        builder.append(c)
-      }
-    }
-    builder.toString()
-  }
-  def unescapePathName(path: String): String = {
-    val sb = new StringBuilder
-    var i = 0
-    while (i < path.length) {
-      val c = path.charAt(i)
-      if (c == '%' && i + 2 < path.length) {
-        val code: Int = try {
-          Integer.valueOf(path.substring(i + 1, i + 3), 16)
-        } catch { case e: Exception =>
-          -1: Integer
-        }
-        if (code >= 0) {
-          sb.append(code.asInstanceOf[Char])
-          i += 3
-        } else {
-          sb.append(c)
-          i += 1
-        }
-      } else {
-        sb.append(c)
-        i += 1
-      }
-    }
-    sb.toString()
-  }
-package org.apache.spark.sql.sources
-import java.text.SimpleDateFormat
-import java.util.Date
-import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.{Partition => SparkPartition, _}
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.rdd.{RDD, HadoopRDD}
-import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-import scala.reflect.ClassTag
-private[spark] class SqlNewHadoopPartition(
-    rddId: Int,
-    val index: Int,
-    @transient rawSplit: InputSplit with Writable)
-  extends SparkPartition {
-  val serializableHadoopSplit = new SerializableWritable(rawSplit)
-  override def hashCode(): Int = 41 * (41 + rddId) + index
- * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
- * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
- * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three additions.
- * 1. A shared broadcast Hadoop Configuration.
- * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations at the driver side
- *    to the shared Hadoop Configuration.
- * 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side
- *    and the executor side to the shared Hadoop Configuration.
- *
- * Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with
- * changes based on [[org.apache.spark.rdd.HadoopRDD]]. In future, this functionality will be
- * folded into core.
- */
-private[sql] class SqlNewHadoopRDD[K, V](
-    @transient sc : SparkContext,
-    broadcastedConf: Broadcast[SerializableConfiguration],
-    @transient initDriverSideJobFuncOpt: Option[Job => Unit],
-    initLocalJobFuncOpt: Option[Job => Unit],
-    inputFormatClass: Class[_ <: InputFormat[K, V]],
-    keyClass: Class[K],
-    valueClass: Class[V])
-  extends RDD[(K, V)](sc, Nil)
-  with SparkHadoopMapReduceUtil
-  with Logging {
-  protected def getJob(): Job = {
-    val conf: Configuration = broadcastedConf.value.value
-    // "new Job" will make a copy of the conf. Then, it is
-    // safe to mutate conf properties with initLocalJobFuncOpt
-    // and initDriverSideJobFuncOpt.
-    val newJob = new Job(conf)
- => f(newJob))
-    newJob
-  }
-  def getConf(isDriverSide: Boolean): Configuration = {
-    val job = getJob()
-    if (isDriverSide) {
- => f(job))
-    }
-    job.getConfiguration
-  }
-  private val jobTrackerId: String = {
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    formatter.format(new Date())
-  }
-  @transient protected val jobId = new JobID(jobTrackerId, id)
-  override def getPartitions: Array[SparkPartition] = {
-    val conf = getConf(isDriverSide = true)
-    val inputFormat = inputFormatClass.newInstance
-    inputFormat match {
-      case configurable: Configurable =>
-        configurable.setConf(conf)
-      case _ =>
-    }
-    val jobContext = newJobContext(conf, jobId)
-    val rawSplits = inputFormat.getSplits(jobContext).toArray
-    val result = new Array[SparkPartition](rawSplits.size)
-    for (i <- 0 until rawSplits.size) {
-      result(i) =
-        new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
-    }
-    result
-  }
-  override def compute(
-      theSplit: SparkPartition,
-      context: TaskContext): InterruptibleIterator[(K, V)] = {
-    val iter = new Iterator[(K, V)] {
-      val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
-      logInfo("Input split: " + split.serializableHadoopSplit)
-      val conf = getConf(isDriverSide = false)
-      val inputMetrics = context.taskMetrics
-        .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
-      // Find a function that will return the FileSystem bytes read by this thread. Do this before
-      // creating RecordReader, because RecordReader's constructor might read some bytes
-      val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
-        split.serializableHadoopSplit.value match {
-          case _: FileSplit | _: CombineFileSplit =>
-            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
-          case _ => None
-        }
-      }
-      inputMetrics.setBytesReadCallback(bytesReadCallback)
-      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
-      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
-      val format = inputFormatClass.newInstance
-      format match {
-        case configurable: Configurable =>
-          configurable.setConf(conf)
-        case _ =>
-      }
-      val reader = format.createRecordReader(
-        split.serializableHadoopSplit.value, hadoopAttemptContext)
-      reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
-      // Register an on-task-completion callback to close the input stream.
-      context.addTaskCompletionListener(context => close())
-      var havePair = false
-      var finished = false
-      var recordsSinceMetricsUpdate = 0
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = !reader.nextKeyValue
-          havePair = !finished
-        }
-        !finished
-      }
-      override def next(): (K, V) = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        if (!finished) {
-          inputMetrics.incRecordsRead(1)
-        }
-        (reader.getCurrentKey, reader.getCurrentValue)
-      }
-      private def close() {
-        try {
-          reader.close()
-          if (bytesReadCallback.isDefined) {
-            inputMetrics.updateBytesRead()
-          } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
-                     split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
-            // If we can't get the bytes read from the FS stats, fall back to the split size,
-            // which may be inaccurate.
-            try {
-              inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
-            } catch {
-              case e: =>
-                logWarning("Unable to get input size to set InputMetrics for task", e)
-            }
-          }
-        } catch {
-          case e: Exception => {
-            if (!Utils.inShutdown()) {
-              logWarning("Exception in RecordReader.close()", e)
-            }
-          }
-        }
-      }
-    }
-    new InterruptibleIterator(context, iter)
-  }
-  /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
-  @DeveloperApi
-  def mapPartitionsWithInputSplit[U: ClassTag](
-      f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
-      preservesPartitioning: Boolean = false): RDD[U] = {
-    new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
-  }
-  override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = {
-    val split = hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value
-    val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
-      case Some(c) =>
-        try {
-          val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
-          Some(HadoopRDD.convertSplitLocationInfo(infos))
-        } catch {
-          case e : Exception =>
-            logDebug("Failed to use InputSplit#getLocationInfo.", e)
-            None
-        }
-      case None => None
-    }
-    locs.getOrElse(split.getLocations.filter(_ != "localhost"))
-  }
-  override def persist(storageLevel: StorageLevel): this.type = {
-    if (storageLevel.deserialized) {
-      logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" +
-        " behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
-        " Use a map transformation to make copies of the records.")
-    }
-    super.persist(storageLevel)
-  }
-private[spark] object SqlNewHadoopRDD {
-  /**
-   * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
-   * the given function rather than the index of the partition.
-   */
-  private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
-      prev: RDD[T],
-      f: (InputSplit, Iterator[T]) => Iterator[U],
-      preservesPartitioning: Boolean = false)
-    extends RDD[U](prev) {
-    override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
-    override def getPartitions: Array[SparkPartition] = firstParent[T].partitions
-    override def compute(split: SparkPartition, context: TaskContext): Iterator[U] = {
-      val partition = split.asInstanceOf[SqlNewHadoopPartition]
-      val inputSplit = partition.serializableHadoopSplit.value
-      f(inputSplit, firstParent[T].iterator(split, context))
-    }
-  }

