You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/21 20:56:42 UTC
[1/3] spark git commit: [SPARK-8906][SQL] Move all internal data
source classes into execution.datasources.
Repository: spark
Updated Branches:
refs/heads/master 9ba7c64de -> 60c0ce134
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
deleted file mode 100644
index 5c6ef2d..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ /dev/null
@@ -1,581 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources
-
-import java.util.{Date, UUID}
-
-import scala.collection.JavaConversions.asScalaIterator
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
-
-import org.apache.spark._
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.types.StringType
-import org.apache.spark.util.SerializableConfiguration
-
-private[sql] case class InsertIntoDataSource(
- logicalRelation: LogicalRelation,
- query: LogicalPlan,
- overwrite: Boolean)
- extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
- val data = DataFrame(sqlContext, query)
- // Apply the schema of the existing table to the new data.
- val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
- relation.insert(df, overwrite)
-
- // Invalidate the cache.
- sqlContext.cacheManager.invalidateCache(logicalRelation)
-
- Seq.empty[Row]
- }
-}
-
-/**
- * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
- * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
- * single write job, and owns a UUID that identifies this job. Each concrete implementation of
- * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
- * each task output file. This UUID is passed to executor side via a property named
- * `spark.sql.sources.writeJobUUID`.
- *
- * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
- * are used to write to normal tables and tables with dynamic partitions.
- *
- * Basic work flow of this command is:
- *
- * 1. Driver side setup, including output committer initialization and data source specific
- * preparation work for the write job to be issued.
- * 2. Issues a write job consists of one or more executor side tasks, each of which writes all
- * rows within an RDD partition.
- * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
- * exception is thrown during task commitment, also aborts that task.
- * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
- * thrown during job commitment, also aborts the job.
- */
-private[sql] case class InsertIntoHadoopFsRelation(
- @transient relation: HadoopFsRelation,
- @transient query: LogicalPlan,
- mode: SaveMode)
- extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- require(
- relation.paths.length == 1,
- s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
-
- val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
- val outputPath = new Path(relation.paths.head)
- val fs = outputPath.getFileSystem(hadoopConf)
- val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-
- val pathExists = fs.exists(qualifiedOutputPath)
- val doInsertion = (mode, pathExists) match {
- case (SaveMode.ErrorIfExists, true) =>
- sys.error(s"path $qualifiedOutputPath already exists.")
- case (SaveMode.Overwrite, true) =>
- fs.delete(qualifiedOutputPath, true)
- true
- case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
- true
- case (SaveMode.Ignore, exists) =>
- !exists
- }
- // If we are appending data to an existing dir.
- val isAppend = pathExists && (mode == SaveMode.Append)
-
- if (doInsertion) {
- val job = new Job(hadoopConf)
- job.setOutputKeyClass(classOf[Void])
- job.setOutputValueClass(classOf[InternalRow])
- FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
-
- // We create a DataFrame by applying the schema of relation to the data to make sure.
- // We are writing data based on the expected schema,
- val df = {
- // For partitioned relation r, r.schema's column ordering can be different from the column
- // ordering of data.logicalPlan (partition columns are all moved after data column). We
- // need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
- // safely apply the schema of r.schema to the data.
- val project = Project(
- relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query)
-
- sqlContext.internalCreateDataFrame(
- DataFrame(sqlContext, project).queryExecution.toRdd, relation.schema)
- }
-
- val partitionColumns = relation.partitionColumns.fieldNames
- if (partitionColumns.isEmpty) {
- insert(new DefaultWriterContainer(relation, job, isAppend), df)
- } else {
- val writerContainer = new DynamicPartitionWriterContainer(
- relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
- insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
- }
- }
-
- Seq.empty[Row]
- }
-
- /**
- * Inserts the content of the [[DataFrame]] into a table without any partitioning columns.
- */
- private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = {
- // Uses local vals for serialization
- val needsConversion = relation.needConversion
- val dataSchema = relation.dataSchema
-
- // This call shouldn't be put into the `try` block below because it only initializes and
- // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
- writerContainer.driverSideSetup()
-
- try {
- df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
- writerContainer.commitJob()
- relation.refresh()
- } catch { case cause: Throwable =>
- logError("Aborting job.", cause)
- writerContainer.abortJob()
- throw new SparkException("Job aborted.", cause)
- }
-
- def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
- // If anything below fails, we should abort the task.
- try {
- writerContainer.executorSideSetup(taskContext)
-
- val converter: InternalRow => Row = if (needsConversion) {
- CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
- } else {
- r: InternalRow => r.asInstanceOf[Row]
- }
- while (iterator.hasNext) {
- val internalRow = iterator.next()
- writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
- }
-
- writerContainer.commitTask()
- } catch { case cause: Throwable =>
- logError("Aborting task.", cause)
- writerContainer.abortTask()
- throw new SparkException("Task failed while writing rows.", cause)
- }
- }
- }
-
- /**
- * Inserts the content of the [[DataFrame]] into a table with partitioning columns.
- */
- private def insertWithDynamicPartitions(
- sqlContext: SQLContext,
- writerContainer: BaseWriterContainer,
- df: DataFrame,
- partitionColumns: Array[String]): Unit = {
- // Uses a local val for serialization
- val needsConversion = relation.needConversion
- val dataSchema = relation.dataSchema
-
- require(
- df.schema == relation.schema,
- s"""DataFrame must have the same schema as the relation to which is inserted.
- |DataFrame schema: ${df.schema}
- |Relation schema: ${relation.schema}
- """.stripMargin)
-
- val partitionColumnsInSpec = relation.partitionColumns.fieldNames
- require(
- partitionColumnsInSpec.sameElements(partitionColumns),
- s"""Partition columns mismatch.
- |Expected: ${partitionColumnsInSpec.mkString(", ")}
- |Actual: ${partitionColumns.mkString(", ")}
- """.stripMargin)
-
- val output = df.queryExecution.executedPlan.output
- val (partitionOutput, dataOutput) = output.partition(a => partitionColumns.contains(a.name))
- val codegenEnabled = df.sqlContext.conf.codegenEnabled
-
- // This call shouldn't be put into the `try` block below because it only initializes and
- // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
- writerContainer.driverSideSetup()
-
- try {
- df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
- writerContainer.commitJob()
- relation.refresh()
- } catch { case cause: Throwable =>
- logError("Aborting job.", cause)
- writerContainer.abortJob()
- throw new SparkException("Job aborted.", cause)
- }
-
- def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
- // If anything below fails, we should abort the task.
- try {
- writerContainer.executorSideSetup(taskContext)
-
- // Projects all partition columns and casts them to strings to build partition directories.
- val partitionCasts = partitionOutput.map(Cast(_, StringType))
- val partitionProj = newProjection(codegenEnabled, partitionCasts, output)
- val dataProj = newProjection(codegenEnabled, dataOutput, output)
-
- val dataConverter: InternalRow => Row = if (needsConversion) {
- CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
- } else {
- r: InternalRow => r.asInstanceOf[Row]
- }
-
- while (iterator.hasNext) {
- val internalRow = iterator.next()
- val partitionPart = partitionProj(internalRow)
- val dataPart = dataConverter(dataProj(internalRow))
- writerContainer.outputWriterForRow(partitionPart).write(dataPart)
- }
-
- writerContainer.commitTask()
- } catch { case cause: Throwable =>
- logError("Aborting task.", cause)
- writerContainer.abortTask()
- throw new SparkException("Task failed while writing rows.", cause)
- }
- }
- }
-
- // This is copied from SparkPlan, probably should move this to a more general place.
- private def newProjection(
- codegenEnabled: Boolean,
- expressions: Seq[Expression],
- inputSchema: Seq[Attribute]): Projection = {
- log.debug(
- s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
- if (codegenEnabled) {
-
- try {
- GenerateProjection.generate(expressions, inputSchema)
- } catch {
- case e: Exception =>
- if (sys.props.contains("spark.testing")) {
- throw e
- } else {
- log.error("failed to generate projection, fallback to interpreted", e)
- new InterpretedProjection(expressions, inputSchema)
- }
- }
- } else {
- new InterpretedProjection(expressions, inputSchema)
- }
- }
-}
-
-private[sql] abstract class BaseWriterContainer(
- @transient val relation: HadoopFsRelation,
- @transient job: Job,
- isAppend: Boolean)
- extends SparkHadoopMapReduceUtil
- with Logging
- with Serializable {
-
- protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
-
- // This UUID is used to avoid output file name collision between different appending write jobs.
- // These jobs may belong to different SparkContext instances. Concrete data source implementations
- // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
- // The reason why this ID is used to identify a job rather than a single task output file is
- // that, speculative tasks must generate the same output file name as the original task.
- private val uniqueWriteJobId = UUID.randomUUID()
-
- // This is only used on driver side.
- @transient private val jobContext: JobContext = job
-
- // The following fields are initialized and used on both driver and executor side.
- @transient protected var outputCommitter: OutputCommitter = _
- @transient private var jobId: JobID = _
- @transient private var taskId: TaskID = _
- @transient private var taskAttemptId: TaskAttemptID = _
- @transient protected var taskAttemptContext: TaskAttemptContext = _
-
- protected val outputPath: String = {
- assert(
- relation.paths.length == 1,
- s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
- relation.paths.head
- }
-
- protected val dataSchema = relation.dataSchema
-
- protected var outputWriterFactory: OutputWriterFactory = _
-
- private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
-
- def driverSideSetup(): Unit = {
- setupIDs(0, 0, 0)
- setupConf()
-
- // This UUID is sent to executor side together with the serialized `Configuration` object within
- // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
- // unique task output files.
- job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
-
- // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
- // clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
- // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
- //
- // Also, the `prepareJobForWrite` call must happen before initializing output format and output
- // committer, since their initialization involve the job configuration, which can be potentially
- // decorated in `prepareJobForWrite`.
- outputWriterFactory = relation.prepareJobForWrite(job)
- taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
-
- outputFormatClass = job.getOutputFormatClass
- outputCommitter = newOutputCommitter(taskAttemptContext)
- outputCommitter.setupJob(jobContext)
- }
-
- def executorSideSetup(taskContext: TaskContext): Unit = {
- setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
- setupConf()
- taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
- outputCommitter = newOutputCommitter(taskAttemptContext)
- outputCommitter.setupTask(taskAttemptContext)
- initWriters()
- }
-
- protected def getWorkPath: String = {
- outputCommitter match {
- // FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
- case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
- case _ => outputPath
- }
- }
-
- private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
- val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
-
- if (isAppend) {
- // If we are appending data to an existing dir, we will only use the output committer
- // associated with the file output format since it is not safe to use a custom
- // committer for appending. For example, in S3, direct parquet output committer may
- // leave partial data in the destination dir when the the appending job fails.
- logInfo(
- s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
- "for appending.")
- defaultOutputCommitter
- } else {
- val committerClass = context.getConfiguration.getClass(
- SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
- Option(committerClass).map { clazz =>
- logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
-
- // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
- // has an associated output committer. To override this output committer,
- // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
- // If a data source needs to override the output committer, it needs to set the
- // output committer in prepareForWrite method.
- if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
- // The specified output committer is a FileOutputCommitter.
- // So, we will use the FileOutputCommitter-specified constructor.
- val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
- ctor.newInstance(new Path(outputPath), context)
- } else {
- // The specified output committer is just a OutputCommitter.
- // So, we will use the no-argument constructor.
- val ctor = clazz.getDeclaredConstructor()
- ctor.newInstance()
- }
- }.getOrElse {
- // If output committer class is not set, we will use the one associated with the
- // file output format.
- logInfo(
- s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
- defaultOutputCommitter
- }
- }
- }
-
- private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
- this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
- this.taskId = new TaskID(this.jobId, true, splitId)
- this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
- }
-
- private def setupConf(): Unit = {
- serializableConf.value.set("mapred.job.id", jobId.toString)
- serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
- serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
- serializableConf.value.setBoolean("mapred.task.is.map", true)
- serializableConf.value.setInt("mapred.task.partition", 0)
- }
-
- // Called on executor side when writing rows
- def outputWriterForRow(row: InternalRow): OutputWriter
-
- protected def initWriters(): Unit
-
- def commitTask(): Unit = {
- SparkHadoopMapRedUtil.commitTask(
- outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
- }
-
- def abortTask(): Unit = {
- if (outputCommitter != null) {
- outputCommitter.abortTask(taskAttemptContext)
- }
- logError(s"Task attempt $taskAttemptId aborted.")
- }
-
- def commitJob(): Unit = {
- outputCommitter.commitJob(jobContext)
- logInfo(s"Job $jobId committed.")
- }
-
- def abortJob(): Unit = {
- if (outputCommitter != null) {
- outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
- }
- logError(s"Job $jobId aborted.")
- }
-}
-
-private[sql] class DefaultWriterContainer(
- @transient relation: HadoopFsRelation,
- @transient job: Job,
- isAppend: Boolean)
- extends BaseWriterContainer(relation, job, isAppend) {
-
- @transient private var writer: OutputWriter = _
-
- override protected def initWriters(): Unit = {
- taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
- writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
- }
-
- override def outputWriterForRow(row: InternalRow): OutputWriter = writer
-
- override def commitTask(): Unit = {
- try {
- assert(writer != null, "OutputWriter instance should have been initialized")
- writer.close()
- super.commitTask()
- } catch { case cause: Throwable =>
- // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
- // cause `abortTask()` to be invoked.
- throw new RuntimeException("Failed to commit task", cause)
- }
- }
-
- override def abortTask(): Unit = {
- try {
- // It's possible that the task fails before `writer` gets initialized
- if (writer != null) {
- writer.close()
- }
- } finally {
- super.abortTask()
- }
- }
-}
-
-private[sql] class DynamicPartitionWriterContainer(
- @transient relation: HadoopFsRelation,
- @transient job: Job,
- partitionColumns: Array[String],
- defaultPartitionName: String,
- isAppend: Boolean)
- extends BaseWriterContainer(relation, job, isAppend) {
-
- // All output writers are created on executor side.
- @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _
-
- override protected def initWriters(): Unit = {
- outputWriters = new java.util.HashMap[String, OutputWriter]
- }
-
- // The `row` argument is supposed to only contain partition column values which have been casted
- // to strings.
- override def outputWriterForRow(row: InternalRow): OutputWriter = {
- val partitionPath = {
- val partitionPathBuilder = new StringBuilder
- var i = 0
-
- while (i < partitionColumns.length) {
- val col = partitionColumns(i)
- val partitionValueString = {
- val string = row.getString(i)
- if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string)
- }
-
- if (i > 0) {
- partitionPathBuilder.append(Path.SEPARATOR_CHAR)
- }
-
- partitionPathBuilder.append(s"$col=$partitionValueString")
- i += 1
- }
-
- partitionPathBuilder.toString()
- }
-
- val writer = outputWriters.get(partitionPath)
- if (writer.eq(null)) {
- val path = new Path(getWorkPath, partitionPath)
- taskAttemptContext.getConfiguration.set(
- "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
- val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
- outputWriters.put(partitionPath, newWriter)
- newWriter
- } else {
- writer
- }
- }
-
- private def clearOutputWriters(): Unit = {
- if (!outputWriters.isEmpty) {
- asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
- outputWriters.clear()
- }
- }
-
- override def commitTask(): Unit = {
- try {
- clearOutputWriters()
- super.commitTask()
- } catch { case cause: Throwable =>
- throw new RuntimeException("Failed to commit task", cause)
- }
- }
-
- override def abortTask(): Unit = {
- try {
- clearOutputWriters()
- } finally {
- super.abortTask()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
deleted file mode 100644
index 5a8c97c..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ /dev/null
@@ -1,493 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources
-
-import scala.language.{existentials, implicitConversions}
-import scala.util.matching.Regex
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SaveMode}
-import org.apache.spark.util.Utils
-
-/**
- * A parser for foreign DDL commands.
- */
-private[sql] class DDLParser(
- parseQuery: String => LogicalPlan)
- extends AbstractSparkSQLParser with DataTypeParser with Logging {
-
- def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
- try {
- parse(input)
- } catch {
- case ddlException: DDLException => throw ddlException
- case _ if !exceptionOnError => parseQuery(input)
- case x: Throwable => throw x
- }
- }
-
- // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
- // properties via reflection the class in runtime for constructing the SqlLexical object
- protected val CREATE = Keyword("CREATE")
- protected val TEMPORARY = Keyword("TEMPORARY")
- protected val TABLE = Keyword("TABLE")
- protected val IF = Keyword("IF")
- protected val NOT = Keyword("NOT")
- protected val EXISTS = Keyword("EXISTS")
- protected val USING = Keyword("USING")
- protected val OPTIONS = Keyword("OPTIONS")
- protected val DESCRIBE = Keyword("DESCRIBE")
- protected val EXTENDED = Keyword("EXTENDED")
- protected val AS = Keyword("AS")
- protected val COMMENT = Keyword("COMMENT")
- protected val REFRESH = Keyword("REFRESH")
-
- protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable
-
- protected def start: Parser[LogicalPlan] = ddl
-
- /**
- * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
- * USING org.apache.spark.sql.avro
- * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
- * or
- * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
- * USING org.apache.spark.sql.avro
- * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
- * or
- * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
- * USING org.apache.spark.sql.avro
- * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
- * AS SELECT ...
- */
- protected lazy val createTable: Parser[LogicalPlan] =
- // TODO: Support database.table.
- (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
- tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
- case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
- if (temp.isDefined && allowExisting.isDefined) {
- throw new DDLException(
- "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
- }
-
- val options = opts.getOrElse(Map.empty[String, String])
- if (query.isDefined) {
- if (columns.isDefined) {
- throw new DDLException(
- "a CREATE TABLE AS SELECT statement does not allow column definitions.")
- }
- // When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
- val mode = if (allowExisting.isDefined) {
- SaveMode.Ignore
- } else if (temp.isDefined) {
- SaveMode.Overwrite
- } else {
- SaveMode.ErrorIfExists
- }
-
- val queryPlan = parseQuery(query.get)
- CreateTableUsingAsSelect(tableName,
- provider,
- temp.isDefined,
- Array.empty[String],
- mode,
- options,
- queryPlan)
- } else {
- val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
- CreateTableUsing(
- tableName,
- userSpecifiedSchema,
- provider,
- temp.isDefined,
- options,
- allowExisting.isDefined,
- managedIfNoPath = false)
- }
- }
-
- protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
-
- /*
- * describe [extended] table avroTable
- * This will display all columns of table `avroTable` includes column_name,column_type,comment
- */
- protected lazy val describeTable: Parser[LogicalPlan] =
- (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
- case e ~ db ~ tbl =>
- val tblIdentifier = db match {
- case Some(dbName) =>
- Seq(dbName, tbl)
- case None =>
- Seq(tbl)
- }
- DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
- }
-
- protected lazy val refreshTable: Parser[LogicalPlan] =
- REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
- case maybeDatabaseName ~ tableName =>
- RefreshTable(maybeDatabaseName.getOrElse("default"), tableName)
- }
-
- protected lazy val options: Parser[Map[String, String]] =
- "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
-
- protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}
-
- override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
- s"identifier matching regex $regex", {
- case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
- case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
- }
- )
-
- protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ {
- case name => name
- }
-
- protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
- case parts => parts.mkString(".")
- }
-
- protected lazy val pair: Parser[(String, String)] =
- optionName ~ stringLit ^^ { case k ~ v => (k, v) }
-
- protected lazy val column: Parser[StructField] =
- ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm =>
- val meta = cm match {
- case Some(comment) =>
- new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
- case None => Metadata.empty
- }
-
- StructField(columnName, typ, nullable = true, meta)
- }
-}
-
-private[sql] object ResolvedDataSource {
-
- private val builtinSources = Map(
- "jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource",
- "json" -> "org.apache.spark.sql.json.DefaultSource",
- "parquet" -> "org.apache.spark.sql.parquet.DefaultSource",
- "orc" -> "org.apache.spark.sql.hive.orc.DefaultSource"
- )
-
- /** Given a provider name, look up the data source class definition. */
- def lookupDataSource(provider: String): Class[_] = {
- val loader = Utils.getContextOrSparkClassLoader
-
- if (builtinSources.contains(provider)) {
- return loader.loadClass(builtinSources(provider))
- }
-
- try {
- loader.loadClass(provider)
- } catch {
- case cnf: java.lang.ClassNotFoundException =>
- try {
- loader.loadClass(provider + ".DefaultSource")
- } catch {
- case cnf: java.lang.ClassNotFoundException =>
- if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
- sys.error("The ORC data source must be used with Hive support enabled.")
- } else {
- sys.error(s"Failed to load class for data source: $provider")
- }
- }
- }
- }
-
- /** Create a [[ResolvedDataSource]] for reading data in. */
- def apply(
- sqlContext: SQLContext,
- userSpecifiedSchema: Option[StructType],
- partitionColumns: Array[String],
- provider: String,
- options: Map[String, String]): ResolvedDataSource = {
- val clazz: Class[_] = lookupDataSource(provider)
- def className: String = clazz.getCanonicalName
- val relation = userSpecifiedSchema match {
- case Some(schema: StructType) => clazz.newInstance() match {
- case dataSource: SchemaRelationProvider =>
- dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
- case dataSource: HadoopFsRelationProvider =>
- val maybePartitionsSchema = if (partitionColumns.isEmpty) {
- None
- } else {
- Some(partitionColumnsSchema(schema, partitionColumns))
- }
-
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
- val paths = {
- val patternPath = new Path(caseInsensitiveOptions("path"))
- val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
- }
-
- val dataSchema =
- StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable
-
- dataSource.createRelation(
- sqlContext,
- paths,
- Some(dataSchema),
- maybePartitionsSchema,
- caseInsensitiveOptions)
- case dataSource: org.apache.spark.sql.sources.RelationProvider =>
- throw new AnalysisException(s"$className does not allow user-specified schemas.")
- case _ =>
- throw new AnalysisException(s"$className is not a RelationProvider.")
- }
-
- case None => clazz.newInstance() match {
- case dataSource: RelationProvider =>
- dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
- case dataSource: HadoopFsRelationProvider =>
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
- val paths = {
- val patternPath = new Path(caseInsensitiveOptions("path"))
- val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
- }
- dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
- case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
- throw new AnalysisException(
- s"A schema needs to be specified when using $className.")
- case _ =>
- throw new AnalysisException(
- s"$className is neither a RelationProvider nor a FSBasedRelationProvider.")
- }
- }
- new ResolvedDataSource(clazz, relation)
- }
-
- private def partitionColumnsSchema(
- schema: StructType,
- partitionColumns: Array[String]): StructType = {
- StructType(partitionColumns.map { col =>
- schema.find(_.name == col).getOrElse {
- throw new RuntimeException(s"Partition column $col not found in schema $schema")
- }
- }).asNullable
- }
-
- /** Create a [[ResolvedDataSource]] for saving the content of the given [[DataFrame]]. */
- def apply(
- sqlContext: SQLContext,
- provider: String,
- partitionColumns: Array[String],
- mode: SaveMode,
- options: Map[String, String],
- data: DataFrame): ResolvedDataSource = {
- if (data.schema.map(_.dataType).exists(_.isInstanceOf[IntervalType])) {
- throw new AnalysisException("Cannot save interval data type into external storage.")
- }
- val clazz: Class[_] = lookupDataSource(provider)
- val relation = clazz.newInstance() match {
- case dataSource: CreatableRelationProvider =>
- dataSource.createRelation(sqlContext, mode, options, data)
- case dataSource: HadoopFsRelationProvider =>
- // Don't glob path for the write path. The contracts here are:
- // 1. Only one output path can be specified on the write path;
- // 2. Output path must be a legal HDFS style file system path;
- // 3. It's OK that the output path doesn't exist yet;
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
- val outputPath = {
- val path = new Path(caseInsensitiveOptions("path"))
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- path.makeQualified(fs.getUri, fs.getWorkingDirectory)
- }
- val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name)))
- val r = dataSource.createRelation(
- sqlContext,
- Array(outputPath.toString),
- Some(dataSchema.asNullable),
- Some(partitionColumnsSchema(data.schema, partitionColumns)),
- caseInsensitiveOptions)
-
- // For partitioned relation r, r.schema's column ordering can be different from the column
- // ordering of data.logicalPlan (partition columns are all moved after data column). This
- // will be adjusted within InsertIntoHadoopFsRelation.
- sqlContext.executePlan(
- InsertIntoHadoopFsRelation(
- r,
- data.logicalPlan,
- mode)).toRdd
- r
- case _ =>
- sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
- }
- new ResolvedDataSource(clazz, relation)
- }
-}
-
-private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
-
-/**
- * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
- * @param table The table to be described.
- * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
- * It is effective only when the table is a Hive table.
- */
-private[sql] case class DescribeCommand(
- table: LogicalPlan,
- isExtended: Boolean) extends LogicalPlan with Command {
-
- override def children: Seq[LogicalPlan] = Seq.empty
- override val output: Seq[Attribute] = Seq(
- // Column names are based on Hive.
- AttributeReference("col_name", StringType, nullable = false,
- new MetadataBuilder().putString("comment", "name of the column").build())(),
- AttributeReference("data_type", StringType, nullable = false,
- new MetadataBuilder().putString("comment", "data type of the column").build())(),
- AttributeReference("comment", StringType, nullable = false,
- new MetadataBuilder().putString("comment", "comment of the column").build())())
-}
-
-/**
- * Used to represent the operation of create table using a data source.
- * @param allowExisting If it is true, we will do nothing when the table already exists.
- * If it is false, an exception will be thrown
- */
-private[sql] case class CreateTableUsing(
- tableName: String,
- userSpecifiedSchema: Option[StructType],
- provider: String,
- temporary: Boolean,
- options: Map[String, String],
- allowExisting: Boolean,
- managedIfNoPath: Boolean) extends LogicalPlan with Command {
-
- override def output: Seq[Attribute] = Seq.empty
- override def children: Seq[LogicalPlan] = Seq.empty
-}
-
-/**
- * A node used to support CTAS statements and saveAsTable for the data source API.
- * This node is a [[UnaryNode]] instead of a [[Command]] because we want the analyzer
- * can analyze the logical plan that will be used to populate the table.
- * So, [[PreWriteCheck]] can detect cases that are not allowed.
- */
-private[sql] case class CreateTableUsingAsSelect(
- tableName: String,
- provider: String,
- temporary: Boolean,
- partitionColumns: Array[String],
- mode: SaveMode,
- options: Map[String, String],
- child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = Seq.empty[Attribute]
- // TODO: Override resolved after we support databaseName.
- // override lazy val resolved = databaseName != None && childrenResolved
-}
-
-private[sql] case class CreateTempTableUsing(
- tableName: String,
- userSpecifiedSchema: Option[StructType],
- provider: String,
- options: Map[String, String]) extends RunnableCommand {
-
- def run(sqlContext: SQLContext): Seq[InternalRow] = {
- val resolved = ResolvedDataSource(
- sqlContext, userSpecifiedSchema, Array.empty[String], provider, options)
- sqlContext.registerDataFrameAsTable(
- DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
- Seq.empty
- }
-}
-
-private[sql] case class CreateTempTableUsingAsSelect(
- tableName: String,
- provider: String,
- partitionColumns: Array[String],
- mode: SaveMode,
- options: Map[String, String],
- query: LogicalPlan) extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[InternalRow] = {
- val df = DataFrame(sqlContext, query)
- val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df)
- sqlContext.registerDataFrameAsTable(
- DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
-
- Seq.empty
- }
-}
-
-private[sql] case class RefreshTable(databaseName: String, tableName: String)
- extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[InternalRow] = {
- // Refresh the given table's metadata first.
- sqlContext.catalog.refreshTable(databaseName, tableName)
-
- // If this table is cached as a InMemoryColumnarRelation, drop the original
- // cached version and make the new version cached lazily.
- val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
- // Use lookupCachedData directly since RefreshTable also takes databaseName.
- val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
- if (isCached) {
- // Create a data frame to represent the table.
- // TODO: Use uncacheTable once it supports database name.
- val df = DataFrame(sqlContext, logicalPlan)
- // Uncache the logicalPlan.
- sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
- // Cache it again.
- sqlContext.cacheManager.cacheQuery(df, Some(tableName))
- }
-
- Seq.empty[InternalRow]
- }
-}
-
-/**
- * Builds a map in which keys are case insensitive
- */
-protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
- with Serializable {
-
- val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
-
- override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
-
- override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
- baseMap + kv.copy(_1 = kv._1.toLowerCase)
-
- override def iterator: Iterator[(String, String)] = baseMap.iterator
-
- override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
-}
-
-/**
- * The exception thrown from the DDL parser.
- */
-protected[sql] class DDLException(message: String) extends Exception(message)
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 24e86ca..4d942e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -17,6 +17,10 @@
package org.apache.spark.sql.sources
+////////////////////////////////////////////////////////////////////////////////////////////////////
+// This file defines all the filters that we can push down to the data sources.
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
/**
* A filter predicate for data sources.
*
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 2cd8b35..7cd005b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.sources
import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import org.apache.hadoop.conf.Configuration
@@ -33,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.execution.RDDConversions
+import org.apache.spark.sql.execution.datasources.{PartitioningUtils, PartitionSpec, Partition}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql._
import org.apache.spark.util.SerializableConfiguration
@@ -523,7 +523,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
})
}
- private[sources] final def buildScan(
+ private[sql] final def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String],
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
deleted file mode 100644
index 40ee048..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources
-
-import org.apache.spark.sql.{SaveMode, AnalysisException}
-import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias}
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.types.DataType
-
-/**
- * A rule to do pre-insert data type casting and field renaming. Before we insert into
- * an [[InsertableRelation]], we will use this rule to make sure that
- * the columns to be inserted have the correct data type and fields have the correct names.
- */
-private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- // Wait until children are resolved.
- case p: LogicalPlan if !p.childrenResolved => p
-
- // We are inserting into an InsertableRelation or HadoopFsRelation.
- case i @ InsertIntoTable(
- l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => {
- // First, make sure the data to be inserted have the same number of fields with the
- // schema of the relation.
- if (l.output.size != child.output.size) {
- sys.error(
- s"$l requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE " +
- s"statement generates the same number of columns as its schema.")
- }
- castAndRenameChildOutput(i, l.output, child)
- }
- }
-
- /** If necessary, cast data types and rename fields to the expected types and names. */
- def castAndRenameChildOutput(
- insertInto: InsertIntoTable,
- expectedOutput: Seq[Attribute],
- child: LogicalPlan): InsertIntoTable = {
- val newChildOutput = expectedOutput.zip(child.output).map {
- case (expected, actual) =>
- val needCast = !expected.dataType.sameType(actual.dataType)
- // We want to make sure the filed names in the data to be inserted exactly match
- // names in the schema.
- val needRename = expected.name != actual.name
- (needCast, needRename) match {
- case (true, _) => Alias(Cast(actual, expected.dataType), expected.name)()
- case (false, true) => Alias(actual, expected.name)()
- case (_, _) => actual
- }
- }
-
- if (newChildOutput == child.output) {
- insertInto
- } else {
- insertInto.copy(child = Project(newChildOutput, child))
- }
- }
-}
-
-/**
- * A rule to do various checks before inserting into or writing to a data source table.
- */
-private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) {
- def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
-
- def apply(plan: LogicalPlan): Unit = {
- plan.foreach {
- case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) =>
- // Right now, we do not support insert into a data source table with partition specs.
- if (partition.nonEmpty) {
- failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
- } else {
- // Get all input data source relations of the query.
- val srcRelations = query.collect {
- case LogicalRelation(src: BaseRelation) => src
- }
- if (srcRelations.contains(t)) {
- failAnalysis(
- "Cannot insert overwrite into table that is also being read from.")
- } else {
- // OK
- }
- }
-
- case logical.InsertIntoTable(LogicalRelation(r: HadoopFsRelation), part, _, _, _) =>
- // We need to make sure the partition columns specified by users do match partition
- // columns of the relation.
- val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
- val specifiedPartitionColumns = part.keySet
- if (existingPartitionColumns != specifiedPartitionColumns) {
- failAnalysis(s"Specified partition columns " +
- s"(${specifiedPartitionColumns.mkString(", ")}) " +
- s"do not match the partition columns of the table. Please use " +
- s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.")
- } else {
- // OK
- }
-
- case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
- // The relation in l is not an InsertableRelation.
- failAnalysis(s"$l does not allow insertion.")
-
- case logical.InsertIntoTable(t, _, _, _, _) =>
- if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) {
- failAnalysis(s"Inserting into an RDD-based table is not allowed.")
- } else {
- // OK
- }
-
- case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
- // When the SaveMode is Overwrite, we need to check if the table is an input table of
- // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
- if (catalog.tableExists(Seq(tableName))) {
- // Need to remove SubQuery operator.
- EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match {
- // Only do the check if the table is a data source table
- // (the relation is a BaseRelation).
- case l @ LogicalRelation(dest: BaseRelation) =>
- // Get all input data source relations of the query.
- val srcRelations = query.collect {
- case LogicalRelation(src: BaseRelation) => src
- }
- if (srcRelations.contains(dest)) {
- failAnalysis(
- s"Cannot overwrite table $tableName that is also being read from.")
- } else {
- // OK
- }
-
- case _ => // OK
- }
- } else {
- // OK
- }
-
- case _ => // OK
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 3475f9d..1d04513 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -26,8 +26,8 @@ import org.scalactic.Tolerance._
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.json.InferSchema.compatibleType
-import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index a2763c7..23df102 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.sources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 37b0a9f..4f98776 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -28,11 +28,11 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.sources.PartitioningUtils._
-import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec}
+import org.apache.spark.sql.execution.datasources.{LogicalRelation, PartitionSpec, Partition, PartitioningUtils}
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.unsafe.types.UTF8String
+import PartitioningUtils._
// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index a710884..1907e64 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -22,6 +22,7 @@ import java.io.{File, IOException}
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DDLException
import org.apache.spark.util.Utils
class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 296b0d6..3cbf546 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.sources
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.datasources.ResolvedDataSource
class ResolvedDataSourceSuite extends SparkFunSuite {
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 4684d48..cec7685 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -44,9 +44,9 @@ import org.apache.spark.sql.catalyst.ParserDialect
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUDFs, SetCommand}
+import org.apache.spark.sql.execution.datasources.{PreWriteCheck, PreInsertCastAndRename, DataSourceStrategy}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.sources.DataSourceStrategy
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -384,11 +384,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
catalog.PreInsertionCasts ::
ExtractPythonUDFs ::
ResolveHiveWindowFunction ::
- sources.PreInsertCastAndRename ::
+ PreInsertCastAndRename ::
Nil
override val extendedCheckRules = Seq(
- sources.PreWriteCheck(catalog)
+ PreWriteCheck(catalog)
)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b15261b..0a2121c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
+import scala.collection.JavaConversions._
+
import com.google.common.base.Objects
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
@@ -28,6 +30,7 @@ import org.apache.hadoop.hive.ql.metadata._
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.Logging
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
@@ -35,14 +38,12 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.datasources
+import org.apache.spark.sql.execution.datasources.{Partition => ParquetPartition, PartitionSpec, CreateTableUsingAsSelect, ResolvedDataSource, LogicalRelation}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.parquet.ParquetRelation2
-import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
-/* Implicit conversions */
-import scala.collection.JavaConversions._
private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
extends Catalog with Logging {
@@ -278,7 +279,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
parquetRelation.paths.toSet == pathsInMetastore.toSet &&
logical.schema.sameType(metastoreSchema) &&
parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse {
- PartitionSpec(StructType(Nil), Array.empty[sources.Partition])
+ PartitionSpec(StructType(Nil), Array.empty[datasources.Partition])
}
if (useCached) {
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 7fc517b..f557450 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.execution.ExplainCommand
-import org.apache.spark.sql.sources.DescribeCommand
+import org.apache.spark.sql.execution.datasources.DescribeCommand
import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable, HiveScriptIOSchema}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 9638a82..a22c329 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -30,9 +30,9 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, _}
+import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand}
import org.apache.spark.sql.types.StringType
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 71fa3e9..a47f9a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 48d35a6..de63ee5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -37,6 +37,7 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreTypes, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d910af2..e403f32 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -28,12 +28,12 @@ import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.Logging
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.ParquetRelation2
-import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index c9dd4c0..efb04bf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -22,11 +22,11 @@ import java.io._
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen}
import org.apache.spark.{Logging, SparkFunSuite}
-import org.apache.spark.sql.sources.DescribeCommand
-import org.apache.spark.sql.execution.{SetCommand, ExplainCommand}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.{SetCommand, ExplainCommand}
+import org.apache.spark.sql.execution.datasources.DescribeCommand
import org.apache.spark.sql.hive.test.TestHive
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 05a1f00..0342826 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -23,12 +23,12 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.DefaultParserDialect
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.errors.DialectException
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, MetastoreRelation}
import org.apache.spark.sql.parquet.ParquetRelation2
-import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
case class Nested1(f1: Nested2)
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 9d79a4b..82a8daf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -23,12 +23,12 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
+import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
-import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index afecf96..1cef83f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,10 +17,10 @@
package org.apache.spark.sql.sources
-import scala.collection.JavaConversions._
-
import java.io.File
+import scala.collection.JavaConversions._
+
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -31,10 +31,12 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
+
abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
override lazy val sqlContext: SQLContext = TestHive
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[3/3] spark git commit: [SPARK-8906][SQL] Move all internal data
source classes into execution.datasources.
Posted by rx...@apache.org.
[SPARK-8906][SQL] Move all internal data source classes into execution.datasources.
This way, the sources package contains only public facing interfaces.
Author: Reynold Xin <rx...@databricks.com>
Closes #7565 from rxin/move-ds and squashes the following commits:
7661aff [Reynold Xin] Mima
9d5196a [Reynold Xin] Rearranged imports.
3dd7174 [Reynold Xin] [SPARK-8906][SQL] Move all internal data source classes into execution.datasources.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60c0ce13
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60c0ce13
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60c0ce13
Branch: refs/heads/master
Commit: 60c0ce134d90ef18852ed2c637d2f240b7f99ab9
Parents: 9ba7c64
Author: Reynold Xin <rx...@databricks.com>
Authored: Tue Jul 21 11:56:38 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Jul 21 11:56:38 2015 -0700
----------------------------------------------------------------------
project/MimaExcludes.scala | 47 ++
.../scala/org/apache/spark/sql/DataFrame.scala | 2 +-
.../org/apache/spark/sql/DataFrameReader.scala | 4 +-
.../org/apache/spark/sql/DataFrameWriter.scala | 2 +-
.../scala/org/apache/spark/sql/SQLContext.scala | 9 +-
.../spark/sql/execution/SparkStrategies.scala | 4 +-
.../spark/sql/execution/SqlNewHadoopRDD.scala | 263 +++++++++
.../datasources/DataSourceStrategy.scala | 394 +++++++++++++
.../execution/datasources/LogicalRelation.scala | 58 ++
.../datasources/PartitioningUtils.scala | 361 ++++++++++++
.../sql/execution/datasources/commands.scala | 582 +++++++++++++++++++
.../spark/sql/execution/datasources/ddl.scala | 492 ++++++++++++++++
.../spark/sql/execution/datasources/rules.scala | 158 +++++
.../apache/spark/sql/parquet/newParquet.scala | 5 +-
.../spark/sql/sources/DataSourceStrategy.scala | 395 -------------
.../spark/sql/sources/LogicalRelation.scala | 57 --
.../spark/sql/sources/PartitioningUtils.scala | 360 ------------
.../spark/sql/sources/SqlNewHadoopRDD.scala | 264 ---------
.../org/apache/spark/sql/sources/commands.scala | 581 ------------------
.../org/apache/spark/sql/sources/ddl.scala | 493 ----------------
.../org/apache/spark/sql/sources/filters.scala | 4 +
.../apache/spark/sql/sources/interfaces.scala | 4 +-
.../org/apache/spark/sql/sources/rules.scala | 158 -----
.../org/apache/spark/sql/json/JsonSuite.scala | 2 +-
.../spark/sql/parquet/ParquetFilterSuite.scala | 2 +-
.../ParquetPartitionDiscoverySuite.scala | 4 +-
.../sql/sources/CreateTableAsSelectSuite.scala | 1 +
.../sql/sources/ResolvedDataSourceSuite.scala | 1 +
.../org/apache/spark/sql/hive/HiveContext.scala | 6 +-
.../spark/sql/hive/HiveMetastoreCatalog.scala | 11 +-
.../org/apache/spark/sql/hive/HiveQl.scala | 2 +-
.../apache/spark/sql/hive/HiveStrategies.scala | 2 +-
.../spark/sql/hive/execution/commands.scala | 1 +
.../apache/spark/sql/hive/orc/OrcRelation.scala | 1 +
.../sql/hive/MetastoreDataSourcesSuite.scala | 2 +-
.../sql/hive/execution/HiveComparisonTest.scala | 4 +-
.../sql/hive/execution/SQLQuerySuite.scala | 2 +-
.../apache/spark/sql/hive/parquetSuites.scala | 2 +-
.../sql/sources/hadoopFsRelationSuites.scala | 6 +-
39 files changed, 2404 insertions(+), 2342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a2595ff..fa36629 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -104,6 +104,53 @@ object MimaExcludes {
// SPARK-7422 add argmax for sparse vectors
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.linalg.Vector.argmax")
+ ) ++ Seq(
+ // SPARK-8906 Move all internal data source classes into execution.datasources
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.ResolvedDataSource"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopPartition"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DefaultWriterContainer"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DynamicPartitionWriterContainer"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.BaseWriterContainer"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.ResolvedDataSource$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLParser"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CaseInsensitiveMap"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException")
)
case v if v.startsWith("1.4") =>
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 830fba3..323ff17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -38,8 +38,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
+import org.apache.spark.sql.execution.datasources.CreateTableUsingAsSelect
import org.apache.spark.sql.json.JacksonGenerator
-import org.apache.spark.sql.sources.CreateTableUsingAsSelect
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index f1c1ddf..e9d782c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -20,16 +20,16 @@ package org.apache.spark.sql
import java.util.Properties
import org.apache.hadoop.fs.Path
-import org.apache.spark.{Logging, Partition}
+import org.apache.spark.{Logging, Partition}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.parquet.ParquetRelation2
-import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types.StructType
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3e7b9cd..ee0201a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -22,8 +22,8 @@ import java.util.Properties
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
-import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 2dda3ad..8b4528b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -39,8 +39,9 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _}
-import org.apache.spark.sql.execution.{Filter, _}
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -146,11 +147,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
ExtractPythonUDFs ::
- sources.PreInsertCastAndRename ::
+ PreInsertCastAndRename ::
Nil
override val extendedCheckRules = Seq(
- sources.PreWriteCheck(catalog)
+ datasources.PreWriteCheck(catalog)
)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 240332a..8cef7f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution
+import org.apache.spark.sql.{SQLContext, Strategy, execution}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
@@ -25,10 +26,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
+import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
import org.apache.spark.sql.parquet._
-import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{SQLContext, Strategy, execution}
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala
new file mode 100644
index 0000000..e1c1a6c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import org.apache.spark.{Partition => SparkPartition, _}
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
+import org.apache.spark.rdd.{HadoopRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+import scala.reflect.ClassTag
+
+private[spark] class SqlNewHadoopPartition(
+ rddId: Int,
+ val index: Int,
+ @transient rawSplit: InputSplit with Writable)
+ extends SparkPartition {
+
+ val serializableHadoopSplit = new SerializableWritable(rawSplit)
+
+ override def hashCode(): Int = 41 * (41 + rddId) + index
+}
+
+/**
+ * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
+ * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
+ * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three additions.
+ * 1. A shared broadcast Hadoop Configuration.
+ * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations at the driver side
+ * to the shared Hadoop Configuration.
+ * 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side
+ * and the executor side to the shared Hadoop Configuration.
+ *
+ * Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with
+ * changes based on [[org.apache.spark.rdd.HadoopRDD]]. In future, this functionality will be
+ * folded into core.
+ */
+private[sql] class SqlNewHadoopRDD[K, V](
+ @transient sc : SparkContext,
+ broadcastedConf: Broadcast[SerializableConfiguration],
+ @transient initDriverSideJobFuncOpt: Option[Job => Unit],
+ initLocalJobFuncOpt: Option[Job => Unit],
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V])
+ extends RDD[(K, V)](sc, Nil)
+ with SparkHadoopMapReduceUtil
+ with Logging {
+
+ protected def getJob(): Job = {
+ val conf: Configuration = broadcastedConf.value.value
+ // "new Job" will make a copy of the conf. Then, it is
+ // safe to mutate conf properties with initLocalJobFuncOpt
+ // and initDriverSideJobFuncOpt.
+ val newJob = new Job(conf)
+ initLocalJobFuncOpt.map(f => f(newJob))
+ newJob
+ }
+
+ def getConf(isDriverSide: Boolean): Configuration = {
+ val job = getJob()
+ if (isDriverSide) {
+ initDriverSideJobFuncOpt.map(f => f(job))
+ }
+ job.getConfiguration
+ }
+
+ private val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ formatter.format(new Date())
+ }
+
+ @transient protected val jobId = new JobID(jobTrackerId, id)
+
+ override def getPartitions: Array[SparkPartition] = {
+ val conf = getConf(isDriverSide = true)
+ val inputFormat = inputFormatClass.newInstance
+ inputFormat match {
+ case configurable: Configurable =>
+ configurable.setConf(conf)
+ case _ =>
+ }
+ val jobContext = newJobContext(conf, jobId)
+ val rawSplits = inputFormat.getSplits(jobContext).toArray
+ val result = new Array[SparkPartition](rawSplits.size)
+ for (i <- 0 until rawSplits.size) {
+ result(i) =
+ new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ }
+ result
+ }
+
+ override def compute(
+ theSplit: SparkPartition,
+ context: TaskContext): InterruptibleIterator[(K, V)] = {
+ val iter = new Iterator[(K, V)] {
+ val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
+ logInfo("Input split: " + split.serializableHadoopSplit)
+ val conf = getConf(isDriverSide = false)
+
+ val inputMetrics = context.taskMetrics
+ .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+
+ // Find a function that will return the FileSystem bytes read by this thread. Do this before
+ // creating RecordReader, because RecordReader's constructor might read some bytes
+ val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
+ split.serializableHadoopSplit.value match {
+ case _: FileSplit | _: CombineFileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+ case _ => None
+ }
+ }
+ inputMetrics.setBytesReadCallback(bytesReadCallback)
+
+ val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
+ val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+ val format = inputFormatClass.newInstance
+ format match {
+ case configurable: Configurable =>
+ configurable.setConf(conf)
+ case _ =>
+ }
+ val reader = format.createRecordReader(
+ split.serializableHadoopSplit.value, hadoopAttemptContext)
+ reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+
+ // Register an on-task-completion callback to close the input stream.
+ context.addTaskCompletionListener(context => close())
+ var havePair = false
+ var finished = false
+ var recordsSinceMetricsUpdate = 0
+
+ override def hasNext: Boolean = {
+ if (!finished && !havePair) {
+ finished = !reader.nextKeyValue
+ havePair = !finished
+ }
+ !finished
+ }
+
+ override def next(): (K, V) = {
+ if (!hasNext) {
+ throw new java.util.NoSuchElementException("End of stream")
+ }
+ havePair = false
+ if (!finished) {
+ inputMetrics.incRecordsRead(1)
+ }
+ (reader.getCurrentKey, reader.getCurrentValue)
+ }
+
+ private def close() {
+ try {
+ reader.close()
+ if (bytesReadCallback.isDefined) {
+ inputMetrics.updateBytesRead()
+ } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+ split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
+ // If we can't get the bytes read from the FS stats, fall back to the split size,
+ // which may be inaccurate.
+ try {
+ inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+ } catch {
+ case e: java.io.IOException =>
+ logWarning("Unable to get input size to set InputMetrics for task", e)
+ }
+ }
+ } catch {
+ case e: Exception => {
+ if (!Utils.inShutdown()) {
+ logWarning("Exception in RecordReader.close()", e)
+ }
+ }
+ }
+ }
+ }
+ new InterruptibleIterator(context, iter)
+ }
+
+ /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
+ @DeveloperApi
+ def mapPartitionsWithInputSplit[U: ClassTag](
+ f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
+ preservesPartitioning: Boolean = false): RDD[U] = {
+ new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
+ }
+
+ override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = {
+ val split = hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value
+ val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
+ case Some(c) =>
+ try {
+ val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
+ Some(HadoopRDD.convertSplitLocationInfo(infos))
+ } catch {
+ case e : Exception =>
+ logDebug("Failed to use InputSplit#getLocationInfo.", e)
+ None
+ }
+ case None => None
+ }
+ locs.getOrElse(split.getLocations.filter(_ != "localhost"))
+ }
+
+ override def persist(storageLevel: StorageLevel): this.type = {
+ if (storageLevel.deserialized) {
+ logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" +
+ " behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
+ " Use a map transformation to make copies of the records.")
+ }
+ super.persist(storageLevel)
+ }
+}
+
+private[spark] object SqlNewHadoopRDD {
+ /**
+ * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
+ * the given function rather than the index of the partition.
+ */
+ private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
+ prev: RDD[T],
+ f: (InputSplit, Iterator[T]) => Iterator[U],
+ preservesPartitioning: Boolean = false)
+ extends RDD[U](prev) {
+
+ override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
+
+ override def getPartitions: Array[SparkPartition] = firstParent[T].partitions
+
+ override def compute(split: SparkPartition, context: TaskContext): Iterator[U] = {
+ val partition = split.asInstanceOf[SqlNewHadoopPartition]
+ val inputSplit = partition.serializableHadoopSplit.value
+ f(inputSplit, firstParent[T].iterator(split, context))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
new file mode 100644
index 0000000..2b40092
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.{Logging, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+/**
+ * A Strategy for planning scans over data sources defined using the sources API.
+ */
+private[sql] object DataSourceStrategy extends Strategy with Logging {
+ def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan)) =>
+ pruneFilterProjectRaw(
+ l,
+ projects,
+ filters,
+ (a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil
+
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan)) =>
+ pruneFilterProject(
+ l,
+ projects,
+ filters,
+ (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil
+
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan)) =>
+ pruneFilterProject(
+ l,
+ projects,
+ filters,
+ (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
+
+ // Scanning partitioned HadoopFsRelation
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation))
+ if t.partitionSpec.partitionColumns.nonEmpty =>
+ val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
+
+ logInfo {
+ val total = t.partitionSpec.partitions.length
+ val selected = selectedPartitions.length
+ val percentPruned = (1 - selected.toDouble / total.toDouble) * 100
+ s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
+ }
+
+ // Only pushes down predicates that do not reference partition columns.
+ val pushedFilters = {
+ val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet
+ filters.filter { f =>
+ val referencedColumnNames = f.references.map(_.name).toSet
+ referencedColumnNames.intersect(partitionColumnNames).isEmpty
+ }
+ }
+
+ buildPartitionedTableScan(
+ l,
+ projects,
+ pushedFilters,
+ t.partitionSpec.partitionColumns,
+ selectedPartitions) :: Nil
+
+ // Scanning non-partitioned HadoopFsRelation
+ case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
+ // See buildPartitionedTableScan for the reason that we need to create a shard
+ // broadcast HadoopConf.
+ val sharedHadoopConf = SparkHadoopUtil.get.conf
+ val confBroadcast =
+ t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
+ pruneFilterProject(
+ l,
+ projects,
+ filters,
+ (a, f) =>
+ toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil
+
+ case l @ LogicalRelation(t: TableScan) =>
+ execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil
+
+ case i @ logical.InsertIntoTable(
+ l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty =>
+ execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
+
+ case i @ logical.InsertIntoTable(
+ l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) =>
+ val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
+ execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil
+
+ case _ => Nil
+ }
+
+ private def buildPartitionedTableScan(
+ logicalRelation: LogicalRelation,
+ projections: Seq[NamedExpression],
+ filters: Seq[Expression],
+ partitionColumns: StructType,
+ partitions: Array[Partition]) = {
+ val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
+
+ // Because we are creating one RDD per partition, we need to have a shared HadoopConf.
+ // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
+ val sharedHadoopConf = SparkHadoopUtil.get.conf
+ val confBroadcast =
+ relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
+
+ // Builds RDD[Row]s for each selected partition.
+ val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
+ // The table scan operator (PhysicalRDD) which retrieves required columns from data files.
+ // Notice that the schema of data files, represented by `relation.dataSchema`, may contain
+ // some partition column(s).
+ val scan =
+ pruneFilterProject(
+ logicalRelation,
+ projections,
+ filters,
+ (columns: Seq[Attribute], filters) => {
+ val partitionColNames = partitionColumns.fieldNames
+
+ // Don't scan any partition columns to save I/O. Here we are being optimistic and
+ // assuming partition columns data stored in data files are always consistent with those
+ // partition values encoded in partition directory paths.
+ val needed = columns.filterNot(a => partitionColNames.contains(a.name))
+ val dataRows =
+ relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast)
+
+ // Merges data values with partition values.
+ mergeWithPartitionValues(
+ relation.schema,
+ columns.map(_.name).toArray,
+ partitionColNames,
+ partitionValues,
+ toCatalystRDD(logicalRelation, needed, dataRows))
+ })
+
+ scan.execute()
+ }
+
+ val unionedRows =
+ if (perPartitionRows.length == 0) {
+ relation.sqlContext.emptyResult
+ } else {
+ new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
+ }
+
+ execution.PhysicalRDD(projections.map(_.toAttribute), unionedRows)
+ }
+
+ private def mergeWithPartitionValues(
+ schema: StructType,
+ requiredColumns: Array[String],
+ partitionColumns: Array[String],
+ partitionValues: InternalRow,
+ dataRows: RDD[InternalRow]): RDD[InternalRow] = {
+ val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains)
+
+ // If output columns contain any partition column(s), we need to merge scanned data
+ // columns and requested partition columns to form the final result.
+ if (!requiredColumns.sameElements(nonPartitionColumns)) {
+ val mergers = requiredColumns.zipWithIndex.map { case (name, index) =>
+ // To see whether the `index`-th column is a partition column...
+ val i = partitionColumns.indexOf(name)
+ if (i != -1) {
+ // If yes, gets column value from partition values.
+ (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
+ mutableRow(ordinal) = partitionValues(i)
+ }
+ } else {
+ // Otherwise, inherits the value from scanned data.
+ val i = nonPartitionColumns.indexOf(name)
+ (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
+ mutableRow(ordinal) = dataRow(i)
+ }
+ }
+ }
+
+ // Since we know for sure that this closure is serializable, we can avoid the overhead
+ // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
+ // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
+ val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
+ val dataTypes = requiredColumns.map(schema(_).dataType)
+ val mutableRow = new SpecificMutableRow(dataTypes)
+ iterator.map { dataRow =>
+ var i = 0
+ while (i < mutableRow.length) {
+ mergers(i)(mutableRow, dataRow, i)
+ i += 1
+ }
+ mutableRow.asInstanceOf[InternalRow]
+ }
+ }
+
+ // This is an internal RDD whose call site the user should not be concerned with
+ // Since we create many of these (one per partition), the time spent on computing
+ // the call site may add up.
+ Utils.withDummyCallSite(dataRows.sparkContext) {
+ new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
+ }
+
+ } else {
+ dataRows
+ }
+ }
+
+ protected def prunePartitions(
+ predicates: Seq[Expression],
+ partitionSpec: PartitionSpec): Seq[Partition] = {
+ val PartitionSpec(partitionColumns, partitions) = partitionSpec
+ val partitionColumnNames = partitionColumns.map(_.name).toSet
+ val partitionPruningPredicates = predicates.filter {
+ _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ }
+
+ if (partitionPruningPredicates.nonEmpty) {
+ val predicate =
+ partitionPruningPredicates
+ .reduceOption(expressions.And)
+ .getOrElse(Literal(true))
+
+ val boundPredicate = InterpretedPredicate.create(predicate.transform {
+ case a: AttributeReference =>
+ val index = partitionColumns.indexWhere(a.name == _.name)
+ BoundReference(index, partitionColumns(index).dataType, nullable = true)
+ })
+
+ partitions.filter { case Partition(values, _) => boundPredicate(values) }
+ } else {
+ partitions
+ }
+ }
+
+ // Based on Public API.
+ protected def pruneFilterProject(
+ relation: LogicalRelation,
+ projects: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]) = {
+ pruneFilterProjectRaw(
+ relation,
+ projects,
+ filterPredicates,
+ (requestedColumns, pushedFilters) => {
+ scanBuilder(requestedColumns, selectFilters(pushedFilters).toArray)
+ })
+ }
+
+ // Based on Catalyst expressions.
+ protected def pruneFilterProjectRaw(
+ relation: LogicalRelation,
+ projects: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[InternalRow]) = {
+
+ val projectSet = AttributeSet(projects.flatMap(_.references))
+ val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
+ val filterCondition = filterPredicates.reduceLeftOption(expressions.And)
+
+ val pushedFilters = filterPredicates.map { _ transform {
+ case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
+ }}
+
+ if (projects.map(_.toAttribute) == projects &&
+ projectSet.size == projects.size &&
+ filterSet.subsetOf(projectSet)) {
+ // When it is possible to just use column pruning to get the right projection and
+ // when the columns of this projection are enough to evaluate all filter conditions,
+ // just do a scan followed by a filter, with no extra project.
+ val requestedColumns =
+ projects.asInstanceOf[Seq[Attribute]] // Safe due to if above.
+ .map(relation.attributeMap) // Match original case of attributes.
+
+ val scan = execution.PhysicalRDD(projects.map(_.toAttribute),
+ scanBuilder(requestedColumns, pushedFilters))
+ filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
+ } else {
+ val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq
+
+ val scan = execution.PhysicalRDD(requestedColumns,
+ scanBuilder(requestedColumns, pushedFilters))
+ execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
+ }
+ }
+
+ /**
+ * Convert RDD of Row into RDD of InternalRow with objects in catalyst types
+ */
+ private[this] def toCatalystRDD(
+ relation: LogicalRelation,
+ output: Seq[Attribute],
+ rdd: RDD[Row]): RDD[InternalRow] = {
+ if (relation.relation.needConversion) {
+ execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
+ } else {
+ rdd.map(_.asInstanceOf[InternalRow])
+ }
+ }
+
+ /**
+ * Convert RDD of Row into RDD of InternalRow with objects in catalyst types
+ */
+ private[this] def toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]): RDD[InternalRow] = {
+ toCatalystRDD(relation, relation.output, rdd)
+ }
+
+ /**
+ * Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s,
+ * and convert them.
+ */
+ protected[sql] def selectFilters(filters: Seq[Expression]) = {
+ def translate(predicate: Expression): Option[Filter] = predicate match {
+ case expressions.EqualTo(a: Attribute, Literal(v, _)) =>
+ Some(sources.EqualTo(a.name, v))
+ case expressions.EqualTo(Literal(v, _), a: Attribute) =>
+ Some(sources.EqualTo(a.name, v))
+
+ case expressions.GreaterThan(a: Attribute, Literal(v, _)) =>
+ Some(sources.GreaterThan(a.name, v))
+ case expressions.GreaterThan(Literal(v, _), a: Attribute) =>
+ Some(sources.LessThan(a.name, v))
+
+ case expressions.LessThan(a: Attribute, Literal(v, _)) =>
+ Some(sources.LessThan(a.name, v))
+ case expressions.LessThan(Literal(v, _), a: Attribute) =>
+ Some(sources.GreaterThan(a.name, v))
+
+ case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, _)) =>
+ Some(sources.GreaterThanOrEqual(a.name, v))
+ case expressions.GreaterThanOrEqual(Literal(v, _), a: Attribute) =>
+ Some(sources.LessThanOrEqual(a.name, v))
+
+ case expressions.LessThanOrEqual(a: Attribute, Literal(v, _)) =>
+ Some(sources.LessThanOrEqual(a.name, v))
+ case expressions.LessThanOrEqual(Literal(v, _), a: Attribute) =>
+ Some(sources.GreaterThanOrEqual(a.name, v))
+
+ case expressions.InSet(a: Attribute, set) =>
+ Some(sources.In(a.name, set.toArray))
+
+ case expressions.IsNull(a: Attribute) =>
+ Some(sources.IsNull(a.name))
+ case expressions.IsNotNull(a: Attribute) =>
+ Some(sources.IsNotNull(a.name))
+
+ case expressions.And(left, right) =>
+ (translate(left) ++ translate(right)).reduceOption(sources.And)
+
+ case expressions.Or(left, right) =>
+ for {
+ leftFilter <- translate(left)
+ rightFilter <- translate(right)
+ } yield sources.Or(leftFilter, rightFilter)
+
+ case expressions.Not(child) =>
+ translate(child).map(sources.Not)
+
+ case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
+ Some(sources.StringStartsWith(a.name, v.toString))
+
+ case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
+ Some(sources.StringEndsWith(a.name, v.toString))
+
+ case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
+ Some(sources.StringContains(a.name, v.toString))
+
+ case _ => None
+ }
+
+ filters.flatMap(translate)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
new file mode 100644
index 0000000..a7123dc
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.sources.BaseRelation
+
+/**
+ * Used to link a [[BaseRelation]] in to a logical query plan.
+ */
+private[sql] case class LogicalRelation(relation: BaseRelation)
+ extends LeafNode
+ with MultiInstanceRelation {
+
+ override val output: Seq[AttributeReference] = relation.schema.toAttributes
+
+ // Logical Relations are distinct if they have different output for the sake of transformations.
+ override def equals(other: Any): Boolean = other match {
+ case l @ LogicalRelation(otherRelation) => relation == otherRelation && output == l.output
+ case _ => false
+ }
+
+ override def hashCode: Int = {
+ com.google.common.base.Objects.hashCode(relation, output)
+ }
+
+ override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
+ case LogicalRelation(otherRelation) => relation == otherRelation
+ case _ => false
+ }
+
+ @transient override lazy val statistics: Statistics = Statistics(
+ sizeInBytes = BigInt(relation.sizeInBytes)
+ )
+
+ /** Used to lookup original attribute capitalization */
+ val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o)))
+
+ def newInstance(): this.type = LogicalRelation(relation).asInstanceOf[this.type]
+
+ override def simpleString: String = s"Relation[${output.mkString(",")}] $relation"
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
new file mode 100644
index 0000000..6b4a359
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.lang.{Double => JDouble, Long => JLong}
+import java.math.{BigDecimal => JBigDecimal}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.Shell
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
+import org.apache.spark.sql.types._
+
+
+private[sql] case class Partition(values: InternalRow, path: String)
+
+private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
+
+private[sql] object PartitionSpec {
+ val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
+}
+
+private[sql] object PartitioningUtils {
+ // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
+ // depend on Hive.
+ private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
+
+ private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
+ require(columnNames.size == literals.size)
+ }
+
+ /**
+ * Given a group of qualified paths, tries to parse them and returns a partition specification.
+ * For example, given:
+ * {{{
+ * hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
+ * hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
+ * }}}
+ * it returns:
+ * {{{
+ * PartitionSpec(
+ * partitionColumns = StructType(
+ * StructField(name = "a", dataType = IntegerType, nullable = true),
+ * StructField(name = "b", dataType = StringType, nullable = true),
+ * StructField(name = "c", dataType = DoubleType, nullable = true)),
+ * partitions = Seq(
+ * Partition(
+ * values = Row(1, "hello", 3.14),
+ * path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
+ * Partition(
+ * values = Row(2, "world", 6.28),
+ * path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
+ * }}}
+ */
+ private[sql] def parsePartitions(
+ paths: Seq[Path],
+ defaultPartitionName: String,
+ typeInference: Boolean): PartitionSpec = {
+ // First, we need to parse every partition's path and see if we can find partition values.
+ val pathsWithPartitionValues = paths.flatMap { path =>
+ parsePartition(path, defaultPartitionName, typeInference).map(path -> _)
+ }
+
+ if (pathsWithPartitionValues.isEmpty) {
+ // This dataset is not partitioned.
+ PartitionSpec.emptySpec
+ } else {
+ // This dataset is partitioned. We need to check whether all partitions have the same
+ // partition columns and resolve potential type conflicts.
+ val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
+
+ // Creates the StructType which represents the partition columns.
+ val fields = {
+ val PartitionValues(columnNames, literals) = resolvedPartitionValues.head
+ columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
+ // We always assume partition columns are nullable since we've no idea whether null values
+ // will be appended in the future.
+ StructField(name, dataType, nullable = true)
+ }
+ }
+
+ // Finally, we create `Partition`s based on paths and resolved partition values.
+ val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map {
+ case (PartitionValues(_, literals), (path, _)) =>
+ Partition(InternalRow.fromSeq(literals.map(_.value)), path.toString)
+ }
+
+ PartitionSpec(StructType(fields), partitions)
+ }
+ }
+
+ /**
+ * Parses a single partition, returns column names and values of each partition column. For
+ * example, given:
+ * {{{
+ * path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
+ * }}}
+ * it returns:
+ * {{{
+ * PartitionValues(
+ * Seq("a", "b", "c"),
+ * Seq(
+ * Literal.create(42, IntegerType),
+ * Literal.create("hello", StringType),
+ * Literal.create(3.14, FloatType)))
+ * }}}
+ */
+ private[sql] def parsePartition(
+ path: Path,
+ defaultPartitionName: String,
+ typeInference: Boolean): Option[PartitionValues] = {
+ val columns = ArrayBuffer.empty[(String, Literal)]
+ // Old Hadoop versions don't have `Path.isRoot`
+ var finished = path.getParent == null
+ var chopped = path
+
+ while (!finished) {
+ // Sometimes (e.g., when speculative task is enabled), temporary directories may be left
+ // uncleaned. Here we simply ignore them.
+ if (chopped.getName.toLowerCase == "_temporary") {
+ return None
+ }
+
+ val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
+ maybeColumn.foreach(columns += _)
+ chopped = chopped.getParent
+ finished = maybeColumn.isEmpty || chopped.getParent == null
+ }
+
+ if (columns.isEmpty) {
+ None
+ } else {
+ val (columnNames, values) = columns.reverse.unzip
+ Some(PartitionValues(columnNames, values))
+ }
+ }
+
+ private def parsePartitionColumn(
+ columnSpec: String,
+ defaultPartitionName: String,
+ typeInference: Boolean): Option[(String, Literal)] = {
+ val equalSignIndex = columnSpec.indexOf('=')
+ if (equalSignIndex == -1) {
+ None
+ } else {
+ val columnName = columnSpec.take(equalSignIndex)
+ assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'")
+
+ val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
+ assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
+
+ val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference)
+ Some(columnName -> literal)
+ }
+ }
+
+ /**
+ * Resolves possible type conflicts between partitions by up-casting "lower" types. The up-
+ * casting order is:
+ * {{{
+ * NullType ->
+ * IntegerType -> LongType ->
+ * DoubleType -> DecimalType.Unlimited ->
+ * StringType
+ * }}}
+ */
+ private[sql] def resolvePartitions(
+ pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
+ if (pathsWithPartitionValues.isEmpty) {
+ Seq.empty
+ } else {
+ val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct
+ assert(
+ distinctPartColNames.size == 1,
+ listConflictingPartitionColumns(pathsWithPartitionValues))
+
+ // Resolves possible type conflicts for each column
+ val values = pathsWithPartitionValues.map(_._2)
+ val columnCount = values.head.columnNames.size
+ val resolvedValues = (0 until columnCount).map { i =>
+ resolveTypeConflicts(values.map(_.literals(i)))
+ }
+
+ // Fills resolved literals back to each partition
+ values.zipWithIndex.map { case (d, index) =>
+ d.copy(literals = resolvedValues.map(_(index)))
+ }
+ }
+ }
+
+ private[sql] def listConflictingPartitionColumns(
+ pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
+ val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct
+
+ def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
+ seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })
+
+ val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
+ case (path, partValues) => partValues.columnNames -> path
+ })
+
+ val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map {
+ case (names, index) =>
+ s"Partition column name list #$index: $names"
+ }
+
+ // Lists out those non-leaf partition directories that also contain files
+ val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
+
+ s"Conflicting partition column names detected:\n" +
+ distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
+ "For partitioned table directories, data files should only live in leaf directories.\n" +
+ "And directories at the same level should have the same partition column name.\n" +
+ "Please check the following directories for unexpected files or " +
+ "inconsistent partition column names:\n" +
+ suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
+ }
+
+ /**
+ * Converts a string to a [[Literal]] with automatic type inference. Currently only supports
+ * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and
+ * [[StringType]].
+ */
+ private[sql] def inferPartitionColumnValue(
+ raw: String,
+ defaultPartitionName: String,
+ typeInference: Boolean): Literal = {
+ if (typeInference) {
+ // First tries integral types
+ Try(Literal.create(Integer.parseInt(raw), IntegerType))
+ .orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
+ // Then falls back to fractional types
+ .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
+ .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited)))
+ // Then falls back to string
+ .getOrElse {
+ if (raw == defaultPartitionName) {
+ Literal.create(null, NullType)
+ } else {
+ Literal.create(unescapePathName(raw), StringType)
+ }
+ }
+ } else {
+ if (raw == defaultPartitionName) {
+ Literal.create(null, NullType)
+ } else {
+ Literal.create(unescapePathName(raw), StringType)
+ }
+ }
+ }
+
+ private val upCastingOrder: Seq[DataType] =
+ Seq(NullType, IntegerType, LongType, FloatType, DoubleType, DecimalType.Unlimited, StringType)
+
+ /**
+ * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
+ * types.
+ */
+ private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
+ val desiredType = {
+ val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
+ // Falls back to string if all values of this column are null or empty string
+ if (topType == NullType) StringType else topType
+ }
+
+ literals.map { case l @ Literal(_, dataType) =>
+ Literal.create(Cast(l, desiredType).eval(), desiredType)
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+
+ val charToEscape = {
+ val bitSet = new java.util.BitSet(128)
+
+ /**
+ * ASCII 01-1F are HTTP control characters that need to be escaped.
+ * \u000A and \u000D are \n and \r, respectively.
+ */
+ val clist = Array(
+ '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
+ '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
+ '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
+ '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
+ '{', '[', ']', '^')
+
+ clist.foreach(bitSet.set(_))
+
+ if (Shell.WINDOWS) {
+ Array(' ', '<', '>', '|').foreach(bitSet.set(_))
+ }
+
+ bitSet
+ }
+
+ def needsEscaping(c: Char): Boolean = {
+ c >= 0 && c < charToEscape.size() && charToEscape.get(c)
+ }
+
+ def escapePathName(path: String): String = {
+ val builder = new StringBuilder()
+ path.foreach { c =>
+ if (needsEscaping(c)) {
+ builder.append('%')
+ builder.append(f"${c.asInstanceOf[Int]}%02x")
+ } else {
+ builder.append(c)
+ }
+ }
+
+ builder.toString()
+ }
+
+ def unescapePathName(path: String): String = {
+ val sb = new StringBuilder
+ var i = 0
+
+ while (i < path.length) {
+ val c = path.charAt(i)
+ if (c == '%' && i + 2 < path.length) {
+ val code: Int = try {
+ Integer.valueOf(path.substring(i + 1, i + 3), 16)
+ } catch { case e: Exception =>
+ -1: Integer
+ }
+ if (code >= 0) {
+ sb.append(code.asInstanceOf[Char])
+ i += 3
+ } else {
+ sb.append(c)
+ i += 1
+ }
+ } else {
+ sb.append(c)
+ i += 1
+ }
+ }
+
+ sb.toString()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
new file mode 100644
index 0000000..84a0441
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConversions.asScalaIterator
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
+import org.apache.spark._
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.util.SerializableConfiguration
+
+
+private[sql] case class InsertIntoDataSource(
+ logicalRelation: LogicalRelation,
+ query: LogicalPlan,
+ overwrite: Boolean)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
+ val data = DataFrame(sqlContext, query)
+ // Apply the schema of the existing table to the new data.
+ val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
+ relation.insert(df, overwrite)
+
+ // Invalidate the cache.
+ sqlContext.cacheManager.invalidateCache(logicalRelation)
+
+ Seq.empty[Row]
+ }
+}
+
+/**
+ * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
+ * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
+ * single write job, and owns a UUID that identifies this job. Each concrete implementation of
+ * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
+ * each task output file. This UUID is passed to executor side via a property named
+ * `spark.sql.sources.writeJobUUID`.
+ *
+ * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
+ * are used to write to normal tables and tables with dynamic partitions.
+ *
+ * Basic work flow of this command is:
+ *
+ * 1. Driver side setup, including output committer initialization and data source specific
+ * preparation work for the write job to be issued.
+ * 2. Issues a write job consists of one or more executor side tasks, each of which writes all
+ * rows within an RDD partition.
+ * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
+ * exception is thrown during task commitment, also aborts that task.
+ * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
+ * thrown during job commitment, also aborts the job.
+ */
+private[sql] case class InsertIntoHadoopFsRelation(
+ @transient relation: HadoopFsRelation,
+ @transient query: LogicalPlan,
+ mode: SaveMode)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ require(
+ relation.paths.length == 1,
+ s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
+
+ val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+ val outputPath = new Path(relation.paths.head)
+ val fs = outputPath.getFileSystem(hadoopConf)
+ val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+
+ val pathExists = fs.exists(qualifiedOutputPath)
+ val doInsertion = (mode, pathExists) match {
+ case (SaveMode.ErrorIfExists, true) =>
+ sys.error(s"path $qualifiedOutputPath already exists.")
+ case (SaveMode.Overwrite, true) =>
+ fs.delete(qualifiedOutputPath, true)
+ true
+ case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
+ true
+ case (SaveMode.Ignore, exists) =>
+ !exists
+ }
+ // If we are appending data to an existing dir.
+ val isAppend = pathExists && (mode == SaveMode.Append)
+
+ if (doInsertion) {
+ val job = new Job(hadoopConf)
+ job.setOutputKeyClass(classOf[Void])
+ job.setOutputValueClass(classOf[InternalRow])
+ FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
+
+ // We create a DataFrame by applying the schema of relation to the data to make sure.
+ // We are writing data based on the expected schema,
+ val df = {
+ // For partitioned relation r, r.schema's column ordering can be different from the column
+ // ordering of data.logicalPlan (partition columns are all moved after data column). We
+ // need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
+ // safely apply the schema of r.schema to the data.
+ val project = Project(
+ relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query)
+
+ sqlContext.internalCreateDataFrame(
+ DataFrame(sqlContext, project).queryExecution.toRdd, relation.schema)
+ }
+
+ val partitionColumns = relation.partitionColumns.fieldNames
+ if (partitionColumns.isEmpty) {
+ insert(new DefaultWriterContainer(relation, job, isAppend), df)
+ } else {
+ val writerContainer = new DynamicPartitionWriterContainer(
+ relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
+ insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
+ }
+ }
+
+ Seq.empty[Row]
+ }
+
+ /**
+ * Inserts the content of the [[DataFrame]] into a table without any partitioning columns.
+ */
+ private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = {
+ // Uses local vals for serialization
+ val needsConversion = relation.needConversion
+ val dataSchema = relation.dataSchema
+
+ // This call shouldn't be put into the `try` block below because it only initializes and
+ // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
+ writerContainer.driverSideSetup()
+
+ try {
+ df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
+ writerContainer.commitJob()
+ relation.refresh()
+ } catch { case cause: Throwable =>
+ logError("Aborting job.", cause)
+ writerContainer.abortJob()
+ throw new SparkException("Job aborted.", cause)
+ }
+
+ def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
+ // If anything below fails, we should abort the task.
+ try {
+ writerContainer.executorSideSetup(taskContext)
+
+ val converter: InternalRow => Row = if (needsConversion) {
+ CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
+ } else {
+ r: InternalRow => r.asInstanceOf[Row]
+ }
+ while (iterator.hasNext) {
+ val internalRow = iterator.next()
+ writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
+ }
+
+ writerContainer.commitTask()
+ } catch { case cause: Throwable =>
+ logError("Aborting task.", cause)
+ writerContainer.abortTask()
+ throw new SparkException("Task failed while writing rows.", cause)
+ }
+ }
+ }
+
+ /**
+ * Inserts the content of the [[DataFrame]] into a table with partitioning columns.
+ */
+ private def insertWithDynamicPartitions(
+ sqlContext: SQLContext,
+ writerContainer: BaseWriterContainer,
+ df: DataFrame,
+ partitionColumns: Array[String]): Unit = {
+ // Uses a local val for serialization
+ val needsConversion = relation.needConversion
+ val dataSchema = relation.dataSchema
+
+ require(
+ df.schema == relation.schema,
+ s"""DataFrame must have the same schema as the relation to which is inserted.
+ |DataFrame schema: ${df.schema}
+ |Relation schema: ${relation.schema}
+ """.stripMargin)
+
+ val partitionColumnsInSpec = relation.partitionColumns.fieldNames
+ require(
+ partitionColumnsInSpec.sameElements(partitionColumns),
+ s"""Partition columns mismatch.
+ |Expected: ${partitionColumnsInSpec.mkString(", ")}
+ |Actual: ${partitionColumns.mkString(", ")}
+ """.stripMargin)
+
+ val output = df.queryExecution.executedPlan.output
+ val (partitionOutput, dataOutput) = output.partition(a => partitionColumns.contains(a.name))
+ val codegenEnabled = df.sqlContext.conf.codegenEnabled
+
+ // This call shouldn't be put into the `try` block below because it only initializes and
+ // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
+ writerContainer.driverSideSetup()
+
+ try {
+ df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
+ writerContainer.commitJob()
+ relation.refresh()
+ } catch { case cause: Throwable =>
+ logError("Aborting job.", cause)
+ writerContainer.abortJob()
+ throw new SparkException("Job aborted.", cause)
+ }
+
+ def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
+ // If anything below fails, we should abort the task.
+ try {
+ writerContainer.executorSideSetup(taskContext)
+
+ // Projects all partition columns and casts them to strings to build partition directories.
+ val partitionCasts = partitionOutput.map(Cast(_, StringType))
+ val partitionProj = newProjection(codegenEnabled, partitionCasts, output)
+ val dataProj = newProjection(codegenEnabled, dataOutput, output)
+
+ val dataConverter: InternalRow => Row = if (needsConversion) {
+ CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
+ } else {
+ r: InternalRow => r.asInstanceOf[Row]
+ }
+
+ while (iterator.hasNext) {
+ val internalRow = iterator.next()
+ val partitionPart = partitionProj(internalRow)
+ val dataPart = dataConverter(dataProj(internalRow))
+ writerContainer.outputWriterForRow(partitionPart).write(dataPart)
+ }
+
+ writerContainer.commitTask()
+ } catch { case cause: Throwable =>
+ logError("Aborting task.", cause)
+ writerContainer.abortTask()
+ throw new SparkException("Task failed while writing rows.", cause)
+ }
+ }
+ }
+
+ // This is copied from SparkPlan, probably should move this to a more general place.
+ private def newProjection(
+ codegenEnabled: Boolean,
+ expressions: Seq[Expression],
+ inputSchema: Seq[Attribute]): Projection = {
+ log.debug(
+ s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
+ if (codegenEnabled) {
+
+ try {
+ GenerateProjection.generate(expressions, inputSchema)
+ } catch {
+ case e: Exception =>
+ if (sys.props.contains("spark.testing")) {
+ throw e
+ } else {
+ log.error("failed to generate projection, fallback to interpreted", e)
+ new InterpretedProjection(expressions, inputSchema)
+ }
+ }
+ } else {
+ new InterpretedProjection(expressions, inputSchema)
+ }
+ }
+}
+
+private[sql] abstract class BaseWriterContainer(
+ @transient val relation: HadoopFsRelation,
+ @transient job: Job,
+ isAppend: Boolean)
+ extends SparkHadoopMapReduceUtil
+ with Logging
+ with Serializable {
+
+ protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
+
+ // This UUID is used to avoid output file name collision between different appending write jobs.
+ // These jobs may belong to different SparkContext instances. Concrete data source implementations
+ // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
+ // The reason why this ID is used to identify a job rather than a single task output file is
+ // that, speculative tasks must generate the same output file name as the original task.
+ private val uniqueWriteJobId = UUID.randomUUID()
+
+ // This is only used on driver side.
+ @transient private val jobContext: JobContext = job
+
+ // The following fields are initialized and used on both driver and executor side.
+ @transient protected var outputCommitter: OutputCommitter = _
+ @transient private var jobId: JobID = _
+ @transient private var taskId: TaskID = _
+ @transient private var taskAttemptId: TaskAttemptID = _
+ @transient protected var taskAttemptContext: TaskAttemptContext = _
+
+ protected val outputPath: String = {
+ assert(
+ relation.paths.length == 1,
+ s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
+ relation.paths.head
+ }
+
+ protected val dataSchema = relation.dataSchema
+
+ protected var outputWriterFactory: OutputWriterFactory = _
+
+ private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
+
+ def driverSideSetup(): Unit = {
+ setupIDs(0, 0, 0)
+ setupConf()
+
+ // This UUID is sent to executor side together with the serialized `Configuration` object within
+ // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
+ // unique task output files.
+ job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+
+ // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
+ // clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
+ // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
+ //
+ // Also, the `prepareJobForWrite` call must happen before initializing output format and output
+ // committer, since their initialization involve the job configuration, which can be potentially
+ // decorated in `prepareJobForWrite`.
+ outputWriterFactory = relation.prepareJobForWrite(job)
+ taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
+
+ outputFormatClass = job.getOutputFormatClass
+ outputCommitter = newOutputCommitter(taskAttemptContext)
+ outputCommitter.setupJob(jobContext)
+ }
+
+ def executorSideSetup(taskContext: TaskContext): Unit = {
+ setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
+ setupConf()
+ taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
+ outputCommitter = newOutputCommitter(taskAttemptContext)
+ outputCommitter.setupTask(taskAttemptContext)
+ initWriters()
+ }
+
+ protected def getWorkPath: String = {
+ outputCommitter match {
+ // FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
+ case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
+ case _ => outputPath
+ }
+ }
+
+ private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
+ val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+
+ if (isAppend) {
+ // If we are appending data to an existing dir, we will only use the output committer
+ // associated with the file output format since it is not safe to use a custom
+ // committer for appending. For example, in S3, direct parquet output committer may
+ // leave partial data in the destination dir when the the appending job fails.
+ logInfo(
+ s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
+ "for appending.")
+ defaultOutputCommitter
+ } else {
+ val committerClass = context.getConfiguration.getClass(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+
+ Option(committerClass).map { clazz =>
+ logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
+ // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+ // has an associated output committer. To override this output committer,
+ // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+ // If a data source needs to override the output committer, it needs to set the
+ // output committer in prepareForWrite method.
+ if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
+ // The specified output committer is a FileOutputCommitter.
+ // So, we will use the FileOutputCommitter-specified constructor.
+ val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+ ctor.newInstance(new Path(outputPath), context)
+ } else {
+ // The specified output committer is just a OutputCommitter.
+ // So, we will use the no-argument constructor.
+ val ctor = clazz.getDeclaredConstructor()
+ ctor.newInstance()
+ }
+ }.getOrElse {
+ // If output committer class is not set, we will use the one associated with the
+ // file output format.
+ logInfo(
+ s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
+ defaultOutputCommitter
+ }
+ }
+ }
+
+ private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
+ this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
+ this.taskId = new TaskID(this.jobId, true, splitId)
+ this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
+ }
+
+ private def setupConf(): Unit = {
+ serializableConf.value.set("mapred.job.id", jobId.toString)
+ serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
+ serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
+ serializableConf.value.setBoolean("mapred.task.is.map", true)
+ serializableConf.value.setInt("mapred.task.partition", 0)
+ }
+
+ // Called on executor side when writing rows
+ def outputWriterForRow(row: InternalRow): OutputWriter
+
+ protected def initWriters(): Unit
+
+ def commitTask(): Unit = {
+ SparkHadoopMapRedUtil.commitTask(
+ outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
+ }
+
+ def abortTask(): Unit = {
+ if (outputCommitter != null) {
+ outputCommitter.abortTask(taskAttemptContext)
+ }
+ logError(s"Task attempt $taskAttemptId aborted.")
+ }
+
+ def commitJob(): Unit = {
+ outputCommitter.commitJob(jobContext)
+ logInfo(s"Job $jobId committed.")
+ }
+
+ def abortJob(): Unit = {
+ if (outputCommitter != null) {
+ outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
+ }
+ logError(s"Job $jobId aborted.")
+ }
+}
+
+private[sql] class DefaultWriterContainer(
+ @transient relation: HadoopFsRelation,
+ @transient job: Job,
+ isAppend: Boolean)
+ extends BaseWriterContainer(relation, job, isAppend) {
+
+ @transient private var writer: OutputWriter = _
+
+ override protected def initWriters(): Unit = {
+ taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
+ writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
+ }
+
+ override def outputWriterForRow(row: InternalRow): OutputWriter = writer
+
+ override def commitTask(): Unit = {
+ try {
+ assert(writer != null, "OutputWriter instance should have been initialized")
+ writer.close()
+ super.commitTask()
+ } catch { case cause: Throwable =>
+ // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
+ // cause `abortTask()` to be invoked.
+ throw new RuntimeException("Failed to commit task", cause)
+ }
+ }
+
+ override def abortTask(): Unit = {
+ try {
+ // It's possible that the task fails before `writer` gets initialized
+ if (writer != null) {
+ writer.close()
+ }
+ } finally {
+ super.abortTask()
+ }
+ }
+}
+
+private[sql] class DynamicPartitionWriterContainer(
+ @transient relation: HadoopFsRelation,
+ @transient job: Job,
+ partitionColumns: Array[String],
+ defaultPartitionName: String,
+ isAppend: Boolean)
+ extends BaseWriterContainer(relation, job, isAppend) {
+
+ // All output writers are created on executor side.
+ @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _
+
+ override protected def initWriters(): Unit = {
+ outputWriters = new java.util.HashMap[String, OutputWriter]
+ }
+
+ // The `row` argument is supposed to only contain partition column values which have been casted
+ // to strings.
+ override def outputWriterForRow(row: InternalRow): OutputWriter = {
+ val partitionPath = {
+ val partitionPathBuilder = new StringBuilder
+ var i = 0
+
+ while (i < partitionColumns.length) {
+ val col = partitionColumns(i)
+ val partitionValueString = {
+ val string = row.getString(i)
+ if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string)
+ }
+
+ if (i > 0) {
+ partitionPathBuilder.append(Path.SEPARATOR_CHAR)
+ }
+
+ partitionPathBuilder.append(s"$col=$partitionValueString")
+ i += 1
+ }
+
+ partitionPathBuilder.toString()
+ }
+
+ val writer = outputWriters.get(partitionPath)
+ if (writer.eq(null)) {
+ val path = new Path(getWorkPath, partitionPath)
+ taskAttemptContext.getConfiguration.set(
+ "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
+ val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
+ outputWriters.put(partitionPath, newWriter)
+ newWriter
+ } else {
+ writer
+ }
+ }
+
+ private def clearOutputWriters(): Unit = {
+ if (!outputWriters.isEmpty) {
+ asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
+ outputWriters.clear()
+ }
+ }
+
+ override def commitTask(): Unit = {
+ try {
+ clearOutputWriters()
+ super.commitTask()
+ } catch { case cause: Throwable =>
+ throw new RuntimeException("Failed to commit task", cause)
+ }
+ }
+
+ override def abortTask(): Unit = {
+ try {
+ clearOutputWriters()
+ } finally {
+ super.abortTask()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[2/3] spark git commit: [SPARK-8906][SQL] Move all internal data
source classes into execution.datasources.
Posted by rx...@apache.org.
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
new file mode 100644
index 0000000..c8033d3
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import scala.language.{existentials, implicitConversions}
+import scala.util.matching.Regex
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SaveMode}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, InternalRow}
+import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+/**
+ * A parser for foreign DDL commands.
+ */
+private[sql] class DDLParser(
+ parseQuery: String => LogicalPlan)
+ extends AbstractSparkSQLParser with DataTypeParser with Logging {
+
+ def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
+ try {
+ parse(input)
+ } catch {
+ case ddlException: DDLException => throw ddlException
+ case _ if !exceptionOnError => parseQuery(input)
+ case x: Throwable => throw x
+ }
+ }
+
+ // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
+ // properties via reflection the class in runtime for constructing the SqlLexical object
+ protected val CREATE = Keyword("CREATE")
+ protected val TEMPORARY = Keyword("TEMPORARY")
+ protected val TABLE = Keyword("TABLE")
+ protected val IF = Keyword("IF")
+ protected val NOT = Keyword("NOT")
+ protected val EXISTS = Keyword("EXISTS")
+ protected val USING = Keyword("USING")
+ protected val OPTIONS = Keyword("OPTIONS")
+ protected val DESCRIBE = Keyword("DESCRIBE")
+ protected val EXTENDED = Keyword("EXTENDED")
+ protected val AS = Keyword("AS")
+ protected val COMMENT = Keyword("COMMENT")
+ protected val REFRESH = Keyword("REFRESH")
+
+ protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable
+
+ protected def start: Parser[LogicalPlan] = ddl
+
+ /**
+ * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
+ * USING org.apache.spark.sql.avro
+ * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+ * or
+ * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
+ * USING org.apache.spark.sql.avro
+ * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+ * or
+ * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
+ * USING org.apache.spark.sql.avro
+ * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+ * AS SELECT ...
+ */
+ protected lazy val createTable: Parser[LogicalPlan] =
+ // TODO: Support database.table.
+ (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
+ tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
+ case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
+ if (temp.isDefined && allowExisting.isDefined) {
+ throw new DDLException(
+ "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
+ }
+
+ val options = opts.getOrElse(Map.empty[String, String])
+ if (query.isDefined) {
+ if (columns.isDefined) {
+ throw new DDLException(
+ "a CREATE TABLE AS SELECT statement does not allow column definitions.")
+ }
+ // When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
+ val mode = if (allowExisting.isDefined) {
+ SaveMode.Ignore
+ } else if (temp.isDefined) {
+ SaveMode.Overwrite
+ } else {
+ SaveMode.ErrorIfExists
+ }
+
+ val queryPlan = parseQuery(query.get)
+ CreateTableUsingAsSelect(tableName,
+ provider,
+ temp.isDefined,
+ Array.empty[String],
+ mode,
+ options,
+ queryPlan)
+ } else {
+ val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
+ CreateTableUsing(
+ tableName,
+ userSpecifiedSchema,
+ provider,
+ temp.isDefined,
+ options,
+ allowExisting.isDefined,
+ managedIfNoPath = false)
+ }
+ }
+
+ protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
+
+ /*
+ * describe [extended] table avroTable
+ * This will display all columns of table `avroTable` includes column_name,column_type,comment
+ */
+ protected lazy val describeTable: Parser[LogicalPlan] =
+ (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
+ case e ~ db ~ tbl =>
+ val tblIdentifier = db match {
+ case Some(dbName) =>
+ Seq(dbName, tbl)
+ case None =>
+ Seq(tbl)
+ }
+ DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
+ }
+
+ protected lazy val refreshTable: Parser[LogicalPlan] =
+ REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
+ case maybeDatabaseName ~ tableName =>
+ RefreshTable(maybeDatabaseName.getOrElse("default"), tableName)
+ }
+
+ protected lazy val options: Parser[Map[String, String]] =
+ "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
+
+ protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}
+
+ override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
+ s"identifier matching regex $regex", {
+ case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
+ case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
+ }
+ )
+
+ protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ {
+ case name => name
+ }
+
+ protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
+ case parts => parts.mkString(".")
+ }
+
+ protected lazy val pair: Parser[(String, String)] =
+ optionName ~ stringLit ^^ { case k ~ v => (k, v) }
+
+ protected lazy val column: Parser[StructField] =
+ ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm =>
+ val meta = cm match {
+ case Some(comment) =>
+ new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
+ case None => Metadata.empty
+ }
+
+ StructField(columnName, typ, nullable = true, meta)
+ }
+}
+
+private[sql] object ResolvedDataSource {
+
+ private val builtinSources = Map(
+ "jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource",
+ "json" -> "org.apache.spark.sql.json.DefaultSource",
+ "parquet" -> "org.apache.spark.sql.parquet.DefaultSource",
+ "orc" -> "org.apache.spark.sql.hive.orc.DefaultSource"
+ )
+
+ /** Given a provider name, look up the data source class definition. */
+ def lookupDataSource(provider: String): Class[_] = {
+ val loader = Utils.getContextOrSparkClassLoader
+
+ if (builtinSources.contains(provider)) {
+ return loader.loadClass(builtinSources(provider))
+ }
+
+ try {
+ loader.loadClass(provider)
+ } catch {
+ case cnf: java.lang.ClassNotFoundException =>
+ try {
+ loader.loadClass(provider + ".DefaultSource")
+ } catch {
+ case cnf: java.lang.ClassNotFoundException =>
+ if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
+ sys.error("The ORC data source must be used with Hive support enabled.")
+ } else {
+ sys.error(s"Failed to load class for data source: $provider")
+ }
+ }
+ }
+ }
+
+ /** Create a [[ResolvedDataSource]] for reading data in. */
+ def apply(
+ sqlContext: SQLContext,
+ userSpecifiedSchema: Option[StructType],
+ partitionColumns: Array[String],
+ provider: String,
+ options: Map[String, String]): ResolvedDataSource = {
+ val clazz: Class[_] = lookupDataSource(provider)
+ def className: String = clazz.getCanonicalName
+ val relation = userSpecifiedSchema match {
+ case Some(schema: StructType) => clazz.newInstance() match {
+ case dataSource: SchemaRelationProvider =>
+ dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
+ case dataSource: HadoopFsRelationProvider =>
+ val maybePartitionsSchema = if (partitionColumns.isEmpty) {
+ None
+ } else {
+ Some(partitionColumnsSchema(schema, partitionColumns))
+ }
+
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val paths = {
+ val patternPath = new Path(caseInsensitiveOptions("path"))
+ val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
+ }
+
+ val dataSchema =
+ StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable
+
+ dataSource.createRelation(
+ sqlContext,
+ paths,
+ Some(dataSchema),
+ maybePartitionsSchema,
+ caseInsensitiveOptions)
+ case dataSource: org.apache.spark.sql.sources.RelationProvider =>
+ throw new AnalysisException(s"$className does not allow user-specified schemas.")
+ case _ =>
+ throw new AnalysisException(s"$className is not a RelationProvider.")
+ }
+
+ case None => clazz.newInstance() match {
+ case dataSource: RelationProvider =>
+ dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
+ case dataSource: HadoopFsRelationProvider =>
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val paths = {
+ val patternPath = new Path(caseInsensitiveOptions("path"))
+ val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
+ }
+ dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
+ case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
+ throw new AnalysisException(
+ s"A schema needs to be specified when using $className.")
+ case _ =>
+ throw new AnalysisException(
+ s"$className is neither a RelationProvider nor a FSBasedRelationProvider.")
+ }
+ }
+ new ResolvedDataSource(clazz, relation)
+ }
+
+ private def partitionColumnsSchema(
+ schema: StructType,
+ partitionColumns: Array[String]): StructType = {
+ StructType(partitionColumns.map { col =>
+ schema.find(_.name == col).getOrElse {
+ throw new RuntimeException(s"Partition column $col not found in schema $schema")
+ }
+ }).asNullable
+ }
+
+ /** Create a [[ResolvedDataSource]] for saving the content of the given [[DataFrame]]. */
+ def apply(
+ sqlContext: SQLContext,
+ provider: String,
+ partitionColumns: Array[String],
+ mode: SaveMode,
+ options: Map[String, String],
+ data: DataFrame): ResolvedDataSource = {
+ if (data.schema.map(_.dataType).exists(_.isInstanceOf[IntervalType])) {
+ throw new AnalysisException("Cannot save interval data type into external storage.")
+ }
+ val clazz: Class[_] = lookupDataSource(provider)
+ val relation = clazz.newInstance() match {
+ case dataSource: CreatableRelationProvider =>
+ dataSource.createRelation(sqlContext, mode, options, data)
+ case dataSource: HadoopFsRelationProvider =>
+ // Don't glob path for the write path. The contracts here are:
+ // 1. Only one output path can be specified on the write path;
+ // 2. Output path must be a legal HDFS style file system path;
+ // 3. It's OK that the output path doesn't exist yet;
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val outputPath = {
+ val path = new Path(caseInsensitiveOptions("path"))
+ val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ }
+ val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name)))
+ val r = dataSource.createRelation(
+ sqlContext,
+ Array(outputPath.toString),
+ Some(dataSchema.asNullable),
+ Some(partitionColumnsSchema(data.schema, partitionColumns)),
+ caseInsensitiveOptions)
+
+ // For partitioned relation r, r.schema's column ordering can be different from the column
+ // ordering of data.logicalPlan (partition columns are all moved after data column). This
+ // will be adjusted within InsertIntoHadoopFsRelation.
+ sqlContext.executePlan(
+ InsertIntoHadoopFsRelation(
+ r,
+ data.logicalPlan,
+ mode)).toRdd
+ r
+ case _ =>
+ sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
+ }
+ new ResolvedDataSource(clazz, relation)
+ }
+}
+
+private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
+
+/**
+ * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
+ * @param table The table to be described.
+ * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
+ * It is effective only when the table is a Hive table.
+ */
+private[sql] case class DescribeCommand(
+ table: LogicalPlan,
+ isExtended: Boolean) extends LogicalPlan with Command {
+
+ override def children: Seq[LogicalPlan] = Seq.empty
+ override val output: Seq[Attribute] = Seq(
+ // Column names are based on Hive.
+ AttributeReference("col_name", StringType, nullable = false,
+ new MetadataBuilder().putString("comment", "name of the column").build())(),
+ AttributeReference("data_type", StringType, nullable = false,
+ new MetadataBuilder().putString("comment", "data type of the column").build())(),
+ AttributeReference("comment", StringType, nullable = false,
+ new MetadataBuilder().putString("comment", "comment of the column").build())())
+}
+
+/**
+ * Used to represent the operation of create table using a data source.
+ * @param allowExisting If it is true, we will do nothing when the table already exists.
+ * If it is false, an exception will be thrown
+ */
+private[sql] case class CreateTableUsing(
+ tableName: String,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ temporary: Boolean,
+ options: Map[String, String],
+ allowExisting: Boolean,
+ managedIfNoPath: Boolean) extends LogicalPlan with Command {
+
+ override def output: Seq[Attribute] = Seq.empty
+ override def children: Seq[LogicalPlan] = Seq.empty
+}
+
+/**
+ * A node used to support CTAS statements and saveAsTable for the data source API.
+ * This node is a [[UnaryNode]] instead of a [[Command]] because we want the analyzer
+ * can analyze the logical plan that will be used to populate the table.
+ * So, [[PreWriteCheck]] can detect cases that are not allowed.
+ */
+private[sql] case class CreateTableUsingAsSelect(
+ tableName: String,
+ provider: String,
+ temporary: Boolean,
+ partitionColumns: Array[String],
+ mode: SaveMode,
+ options: Map[String, String],
+ child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = Seq.empty[Attribute]
+ // TODO: Override resolved after we support databaseName.
+ // override lazy val resolved = databaseName != None && childrenResolved
+}
+
+private[sql] case class CreateTempTableUsing(
+ tableName: String,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ options: Map[String, String]) extends RunnableCommand {
+
+ def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ val resolved = ResolvedDataSource(
+ sqlContext, userSpecifiedSchema, Array.empty[String], provider, options)
+ sqlContext.registerDataFrameAsTable(
+ DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
+ Seq.empty
+ }
+}
+
+private[sql] case class CreateTempTableUsingAsSelect(
+ tableName: String,
+ provider: String,
+ partitionColumns: Array[String],
+ mode: SaveMode,
+ options: Map[String, String],
+ query: LogicalPlan) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ val df = DataFrame(sqlContext, query)
+ val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df)
+ sqlContext.registerDataFrameAsTable(
+ DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
+
+ Seq.empty
+ }
+}
+
+private[sql] case class RefreshTable(databaseName: String, tableName: String)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ // Refresh the given table's metadata first.
+ sqlContext.catalog.refreshTable(databaseName, tableName)
+
+ // If this table is cached as a InMemoryColumnarRelation, drop the original
+ // cached version and make the new version cached lazily.
+ val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
+ // Use lookupCachedData directly since RefreshTable also takes databaseName.
+ val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+ if (isCached) {
+ // Create a data frame to represent the table.
+ // TODO: Use uncacheTable once it supports database name.
+ val df = DataFrame(sqlContext, logicalPlan)
+ // Uncache the logicalPlan.
+ sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
+ // Cache it again.
+ sqlContext.cacheManager.cacheQuery(df, Some(tableName))
+ }
+
+ Seq.empty[InternalRow]
+ }
+}
+
+/**
+ * Builds a map in which keys are case insensitive
+ */
+protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
+ with Serializable {
+
+ val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
+
+ override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
+
+ override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
+ baseMap + kv.copy(_1 = kv._1.toLowerCase)
+
+ override def iterator: Iterator[(String, String)] = baseMap.iterator
+
+ override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
+}
+
+/**
+ * The exception thrown from the DDL parser.
+ */
+protected[sql] class DDLException(message: String) extends Exception(message)
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
new file mode 100644
index 0000000..11bb49b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation}
+
+/**
+ * A rule to do pre-insert data type casting and field renaming. Before we insert into
+ * an [[InsertableRelation]], we will use this rule to make sure that
+ * the columns to be inserted have the correct data type and fields have the correct names.
+ */
+private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ // Wait until children are resolved.
+ case p: LogicalPlan if !p.childrenResolved => p
+
+ // We are inserting into an InsertableRelation or HadoopFsRelation.
+ case i @ InsertIntoTable(
+ l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => {
+ // First, make sure the data to be inserted have the same number of fields with the
+ // schema of the relation.
+ if (l.output.size != child.output.size) {
+ sys.error(
+ s"$l requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE " +
+ s"statement generates the same number of columns as its schema.")
+ }
+ castAndRenameChildOutput(i, l.output, child)
+ }
+ }
+
+ /** If necessary, cast data types and rename fields to the expected types and names. */
+ def castAndRenameChildOutput(
+ insertInto: InsertIntoTable,
+ expectedOutput: Seq[Attribute],
+ child: LogicalPlan): InsertIntoTable = {
+ val newChildOutput = expectedOutput.zip(child.output).map {
+ case (expected, actual) =>
+ val needCast = !expected.dataType.sameType(actual.dataType)
+ // We want to make sure the filed names in the data to be inserted exactly match
+ // names in the schema.
+ val needRename = expected.name != actual.name
+ (needCast, needRename) match {
+ case (true, _) => Alias(Cast(actual, expected.dataType), expected.name)()
+ case (false, true) => Alias(actual, expected.name)()
+ case (_, _) => actual
+ }
+ }
+
+ if (newChildOutput == child.output) {
+ insertInto
+ } else {
+ insertInto.copy(child = Project(newChildOutput, child))
+ }
+ }
+}
+
+/**
+ * A rule to do various checks before inserting into or writing to a data source table.
+ */
+private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) {
+ def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
+
+ def apply(plan: LogicalPlan): Unit = {
+ plan.foreach {
+ case i @ logical.InsertIntoTable(
+ l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) =>
+ // Right now, we do not support insert into a data source table with partition specs.
+ if (partition.nonEmpty) {
+ failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
+ } else {
+ // Get all input data source relations of the query.
+ val srcRelations = query.collect {
+ case LogicalRelation(src: BaseRelation) => src
+ }
+ if (srcRelations.contains(t)) {
+ failAnalysis(
+ "Cannot insert overwrite into table that is also being read from.")
+ } else {
+ // OK
+ }
+ }
+
+ case logical.InsertIntoTable(LogicalRelation(r: HadoopFsRelation), part, _, _, _) =>
+ // We need to make sure the partition columns specified by users do match partition
+ // columns of the relation.
+ val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
+ val specifiedPartitionColumns = part.keySet
+ if (existingPartitionColumns != specifiedPartitionColumns) {
+ failAnalysis(s"Specified partition columns " +
+ s"(${specifiedPartitionColumns.mkString(", ")}) " +
+ s"do not match the partition columns of the table. Please use " +
+ s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.")
+ } else {
+ // OK
+ }
+
+ case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
+ // The relation in l is not an InsertableRelation.
+ failAnalysis(s"$l does not allow insertion.")
+
+ case logical.InsertIntoTable(t, _, _, _, _) =>
+ if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) {
+ failAnalysis(s"Inserting into an RDD-based table is not allowed.")
+ } else {
+ // OK
+ }
+
+ case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
+ // When the SaveMode is Overwrite, we need to check if the table is an input table of
+ // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
+ if (catalog.tableExists(Seq(tableName))) {
+ // Need to remove SubQuery operator.
+ EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match {
+ // Only do the check if the table is a data source table
+ // (the relation is a BaseRelation).
+ case l @ LogicalRelation(dest: BaseRelation) =>
+ // Get all input data source relations of the query.
+ val srcRelations = query.collect {
+ case LogicalRelation(src: BaseRelation) => src
+ }
+ if (srcRelations.contains(dest)) {
+ failAnalysis(
+ s"Cannot overwrite table $tableName that is also being read from.")
+ } else {
+ // OK
+ }
+
+ case _ => // OK
+ }
+ } else {
+ // OK
+ }
+
+ case _ => // OK
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index e683eb0..2f9f880 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -35,15 +35,18 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.schema.MessageType
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.{SqlNewHadoopPartition, SqlNewHadoopRDD}
+import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
+
private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
deleted file mode 100644
index 70c9e06..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources
-
-import org.apache.spark.{Logging, TaskContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.types.{StringType, StructType}
-import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.unsafe.types.UTF8String
-
-/**
- * A Strategy for planning scans over data sources defined using the sources API.
- */
-private[sql] object DataSourceStrategy extends Strategy with Logging {
- def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan)) =>
- pruneFilterProjectRaw(
- l,
- projects,
- filters,
- (a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil
-
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan)) =>
- pruneFilterProject(
- l,
- projects,
- filters,
- (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil
-
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan)) =>
- pruneFilterProject(
- l,
- projects,
- filters,
- (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
-
- // Scanning partitioned HadoopFsRelation
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation))
- if t.partitionSpec.partitionColumns.nonEmpty =>
- val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
-
- logInfo {
- val total = t.partitionSpec.partitions.length
- val selected = selectedPartitions.length
- val percentPruned = (1 - selected.toDouble / total.toDouble) * 100
- s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
- }
-
- // Only pushes down predicates that do not reference partition columns.
- val pushedFilters = {
- val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet
- filters.filter { f =>
- val referencedColumnNames = f.references.map(_.name).toSet
- referencedColumnNames.intersect(partitionColumnNames).isEmpty
- }
- }
-
- buildPartitionedTableScan(
- l,
- projects,
- pushedFilters,
- t.partitionSpec.partitionColumns,
- selectedPartitions) :: Nil
-
- // Scanning non-partitioned HadoopFsRelation
- case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
- // See buildPartitionedTableScan for the reason that we need to create a shard
- // broadcast HadoopConf.
- val sharedHadoopConf = SparkHadoopUtil.get.conf
- val confBroadcast =
- t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
- pruneFilterProject(
- l,
- projects,
- filters,
- (a, f) =>
- toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil
-
- case l @ LogicalRelation(t: TableScan) =>
- execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil
-
- case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty =>
- execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
-
- case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) =>
- val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
- execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil
-
- case _ => Nil
- }
-
- private def buildPartitionedTableScan(
- logicalRelation: LogicalRelation,
- projections: Seq[NamedExpression],
- filters: Seq[Expression],
- partitionColumns: StructType,
- partitions: Array[Partition]) = {
- val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
-
- // Because we are creating one RDD per partition, we need to have a shared HadoopConf.
- // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
- val sharedHadoopConf = SparkHadoopUtil.get.conf
- val confBroadcast =
- relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
-
- // Builds RDD[Row]s for each selected partition.
- val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
- // The table scan operator (PhysicalRDD) which retrieves required columns from data files.
- // Notice that the schema of data files, represented by `relation.dataSchema`, may contain
- // some partition column(s).
- val scan =
- pruneFilterProject(
- logicalRelation,
- projections,
- filters,
- (columns: Seq[Attribute], filters) => {
- val partitionColNames = partitionColumns.fieldNames
-
- // Don't scan any partition columns to save I/O. Here we are being optimistic and
- // assuming partition columns data stored in data files are always consistent with those
- // partition values encoded in partition directory paths.
- val needed = columns.filterNot(a => partitionColNames.contains(a.name))
- val dataRows =
- relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast)
-
- // Merges data values with partition values.
- mergeWithPartitionValues(
- relation.schema,
- columns.map(_.name).toArray,
- partitionColNames,
- partitionValues,
- toCatalystRDD(logicalRelation, needed, dataRows))
- })
-
- scan.execute()
- }
-
- val unionedRows =
- if (perPartitionRows.length == 0) {
- relation.sqlContext.emptyResult
- } else {
- new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
- }
-
- execution.PhysicalRDD(projections.map(_.toAttribute), unionedRows)
- }
-
- private def mergeWithPartitionValues(
- schema: StructType,
- requiredColumns: Array[String],
- partitionColumns: Array[String],
- partitionValues: InternalRow,
- dataRows: RDD[InternalRow]): RDD[InternalRow] = {
- val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains)
-
- // If output columns contain any partition column(s), we need to merge scanned data
- // columns and requested partition columns to form the final result.
- if (!requiredColumns.sameElements(nonPartitionColumns)) {
- val mergers = requiredColumns.zipWithIndex.map { case (name, index) =>
- // To see whether the `index`-th column is a partition column...
- val i = partitionColumns.indexOf(name)
- if (i != -1) {
- // If yes, gets column value from partition values.
- (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
- mutableRow(ordinal) = partitionValues(i)
- }
- } else {
- // Otherwise, inherits the value from scanned data.
- val i = nonPartitionColumns.indexOf(name)
- (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
- mutableRow(ordinal) = dataRow(i)
- }
- }
- }
-
- // Since we know for sure that this closure is serializable, we can avoid the overhead
- // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
- // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
- val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
- val dataTypes = requiredColumns.map(schema(_).dataType)
- val mutableRow = new SpecificMutableRow(dataTypes)
- iterator.map { dataRow =>
- var i = 0
- while (i < mutableRow.length) {
- mergers(i)(mutableRow, dataRow, i)
- i += 1
- }
- mutableRow.asInstanceOf[InternalRow]
- }
- }
-
- // This is an internal RDD whose call site the user should not be concerned with
- // Since we create many of these (one per partition), the time spent on computing
- // the call site may add up.
- Utils.withDummyCallSite(dataRows.sparkContext) {
- new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
- }
-
- } else {
- dataRows
- }
- }
-
- protected def prunePartitions(
- predicates: Seq[Expression],
- partitionSpec: PartitionSpec): Seq[Partition] = {
- val PartitionSpec(partitionColumns, partitions) = partitionSpec
- val partitionColumnNames = partitionColumns.map(_.name).toSet
- val partitionPruningPredicates = predicates.filter {
- _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
- }
-
- if (partitionPruningPredicates.nonEmpty) {
- val predicate =
- partitionPruningPredicates
- .reduceOption(expressions.And)
- .getOrElse(Literal(true))
-
- val boundPredicate = InterpretedPredicate.create(predicate.transform {
- case a: AttributeReference =>
- val index = partitionColumns.indexWhere(a.name == _.name)
- BoundReference(index, partitionColumns(index).dataType, nullable = true)
- })
-
- partitions.filter { case Partition(values, _) => boundPredicate(values) }
- } else {
- partitions
- }
- }
-
- // Based on Public API.
- protected def pruneFilterProject(
- relation: LogicalRelation,
- projects: Seq[NamedExpression],
- filterPredicates: Seq[Expression],
- scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]) = {
- pruneFilterProjectRaw(
- relation,
- projects,
- filterPredicates,
- (requestedColumns, pushedFilters) => {
- scanBuilder(requestedColumns, selectFilters(pushedFilters).toArray)
- })
- }
-
- // Based on Catalyst expressions.
- protected def pruneFilterProjectRaw(
- relation: LogicalRelation,
- projects: Seq[NamedExpression],
- filterPredicates: Seq[Expression],
- scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[InternalRow]) = {
-
- val projectSet = AttributeSet(projects.flatMap(_.references))
- val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
- val filterCondition = filterPredicates.reduceLeftOption(expressions.And)
-
- val pushedFilters = filterPredicates.map { _ transform {
- case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
- }}
-
- if (projects.map(_.toAttribute) == projects &&
- projectSet.size == projects.size &&
- filterSet.subsetOf(projectSet)) {
- // When it is possible to just use column pruning to get the right projection and
- // when the columns of this projection are enough to evaluate all filter conditions,
- // just do a scan followed by a filter, with no extra project.
- val requestedColumns =
- projects.asInstanceOf[Seq[Attribute]] // Safe due to if above.
- .map(relation.attributeMap) // Match original case of attributes.
-
- val scan = execution.PhysicalRDD(projects.map(_.toAttribute),
- scanBuilder(requestedColumns, pushedFilters))
- filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
- } else {
- val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq
-
- val scan = execution.PhysicalRDD(requestedColumns,
- scanBuilder(requestedColumns, pushedFilters))
- execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
- }
- }
-
- /**
- * Convert RDD of Row into RDD of InternalRow with objects in catalyst types
- */
- private[this] def toCatalystRDD(
- relation: LogicalRelation,
- output: Seq[Attribute],
- rdd: RDD[Row]): RDD[InternalRow] = {
- if (relation.relation.needConversion) {
- execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
- } else {
- rdd.map(_.asInstanceOf[InternalRow])
- }
- }
-
- /**
- * Convert RDD of Row into RDD of InternalRow with objects in catalyst types
- */
- private[this] def toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]): RDD[InternalRow] = {
- toCatalystRDD(relation, relation.output, rdd)
- }
-
- /**
- * Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s,
- * and convert them.
- */
- protected[sql] def selectFilters(filters: Seq[Expression]) = {
- def translate(predicate: Expression): Option[Filter] = predicate match {
- case expressions.EqualTo(a: Attribute, Literal(v, _)) =>
- Some(sources.EqualTo(a.name, v))
- case expressions.EqualTo(Literal(v, _), a: Attribute) =>
- Some(sources.EqualTo(a.name, v))
-
- case expressions.GreaterThan(a: Attribute, Literal(v, _)) =>
- Some(sources.GreaterThan(a.name, v))
- case expressions.GreaterThan(Literal(v, _), a: Attribute) =>
- Some(sources.LessThan(a.name, v))
-
- case expressions.LessThan(a: Attribute, Literal(v, _)) =>
- Some(sources.LessThan(a.name, v))
- case expressions.LessThan(Literal(v, _), a: Attribute) =>
- Some(sources.GreaterThan(a.name, v))
-
- case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, _)) =>
- Some(sources.GreaterThanOrEqual(a.name, v))
- case expressions.GreaterThanOrEqual(Literal(v, _), a: Attribute) =>
- Some(sources.LessThanOrEqual(a.name, v))
-
- case expressions.LessThanOrEqual(a: Attribute, Literal(v, _)) =>
- Some(sources.LessThanOrEqual(a.name, v))
- case expressions.LessThanOrEqual(Literal(v, _), a: Attribute) =>
- Some(sources.GreaterThanOrEqual(a.name, v))
-
- case expressions.InSet(a: Attribute, set) =>
- Some(sources.In(a.name, set.toArray))
-
- case expressions.IsNull(a: Attribute) =>
- Some(sources.IsNull(a.name))
- case expressions.IsNotNull(a: Attribute) =>
- Some(sources.IsNotNull(a.name))
-
- case expressions.And(left, right) =>
- (translate(left) ++ translate(right)).reduceOption(sources.And)
-
- case expressions.Or(left, right) =>
- for {
- leftFilter <- translate(left)
- rightFilter <- translate(right)
- } yield sources.Or(leftFilter, rightFilter)
-
- case expressions.Not(child) =>
- translate(child).map(sources.Not)
-
- case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
- Some(sources.StringStartsWith(a.name, v.toString))
-
- case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
- Some(sources.StringEndsWith(a.name, v.toString))
-
- case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
- Some(sources.StringContains(a.name, v.toString))
-
- case _ => None
- }
-
- filters.flatMap(translate)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala
deleted file mode 100644
index f374abf..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.sources
-
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeMap}
-import org.apache.spark.sql.catalyst.plans.logical.{Statistics, LeafNode, LogicalPlan}
-
-/**
- * Used to link a [[BaseRelation]] in to a logical query plan.
- */
-private[sql] case class LogicalRelation(relation: BaseRelation)
- extends LeafNode
- with MultiInstanceRelation {
-
- override val output: Seq[AttributeReference] = relation.schema.toAttributes
-
- // Logical Relations are distinct if they have different output for the sake of transformations.
- override def equals(other: Any): Boolean = other match {
- case l @ LogicalRelation(otherRelation) => relation == otherRelation && output == l.output
- case _ => false
- }
-
- override def hashCode: Int = {
- com.google.common.base.Objects.hashCode(relation, output)
- }
-
- override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
- case LogicalRelation(otherRelation) => relation == otherRelation
- case _ => false
- }
-
- @transient override lazy val statistics: Statistics = Statistics(
- sizeInBytes = BigInt(relation.sizeInBytes)
- )
-
- /** Used to lookup original attribute capitalization */
- val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o)))
-
- def newInstance(): this.type = LogicalRelation(relation).asInstanceOf[this.type]
-
- override def simpleString: String = s"Relation[${output.mkString(",")}] $relation"
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
deleted file mode 100644
index 8b2a45d..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources
-
-import java.lang.{Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong}
-import java.math.{BigDecimal => JBigDecimal}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.util.Shell
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
-import org.apache.spark.sql.types._
-
-private[sql] case class Partition(values: InternalRow, path: String)
-
-private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
-
-private[sql] object PartitionSpec {
- val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
-}
-
-private[sql] object PartitioningUtils {
- // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
- // depend on Hive.
- private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
-
- private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
- require(columnNames.size == literals.size)
- }
-
- /**
- * Given a group of qualified paths, tries to parse them and returns a partition specification.
- * For example, given:
- * {{{
- * hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
- * hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
- * }}}
- * it returns:
- * {{{
- * PartitionSpec(
- * partitionColumns = StructType(
- * StructField(name = "a", dataType = IntegerType, nullable = true),
- * StructField(name = "b", dataType = StringType, nullable = true),
- * StructField(name = "c", dataType = DoubleType, nullable = true)),
- * partitions = Seq(
- * Partition(
- * values = Row(1, "hello", 3.14),
- * path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
- * Partition(
- * values = Row(2, "world", 6.28),
- * path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
- * }}}
- */
- private[sql] def parsePartitions(
- paths: Seq[Path],
- defaultPartitionName: String,
- typeInference: Boolean): PartitionSpec = {
- // First, we need to parse every partition's path and see if we can find partition values.
- val pathsWithPartitionValues = paths.flatMap { path =>
- parsePartition(path, defaultPartitionName, typeInference).map(path -> _)
- }
-
- if (pathsWithPartitionValues.isEmpty) {
- // This dataset is not partitioned.
- PartitionSpec.emptySpec
- } else {
- // This dataset is partitioned. We need to check whether all partitions have the same
- // partition columns and resolve potential type conflicts.
- val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
-
- // Creates the StructType which represents the partition columns.
- val fields = {
- val PartitionValues(columnNames, literals) = resolvedPartitionValues.head
- columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
- // We always assume partition columns are nullable since we've no idea whether null values
- // will be appended in the future.
- StructField(name, dataType, nullable = true)
- }
- }
-
- // Finally, we create `Partition`s based on paths and resolved partition values.
- val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map {
- case (PartitionValues(_, literals), (path, _)) =>
- Partition(InternalRow.fromSeq(literals.map(_.value)), path.toString)
- }
-
- PartitionSpec(StructType(fields), partitions)
- }
- }
-
- /**
- * Parses a single partition, returns column names and values of each partition column. For
- * example, given:
- * {{{
- * path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
- * }}}
- * it returns:
- * {{{
- * PartitionValues(
- * Seq("a", "b", "c"),
- * Seq(
- * Literal.create(42, IntegerType),
- * Literal.create("hello", StringType),
- * Literal.create(3.14, FloatType)))
- * }}}
- */
- private[sql] def parsePartition(
- path: Path,
- defaultPartitionName: String,
- typeInference: Boolean): Option[PartitionValues] = {
- val columns = ArrayBuffer.empty[(String, Literal)]
- // Old Hadoop versions don't have `Path.isRoot`
- var finished = path.getParent == null
- var chopped = path
-
- while (!finished) {
- // Sometimes (e.g., when speculative task is enabled), temporary directories may be left
- // uncleaned. Here we simply ignore them.
- if (chopped.getName.toLowerCase == "_temporary") {
- return None
- }
-
- val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
- maybeColumn.foreach(columns += _)
- chopped = chopped.getParent
- finished = maybeColumn.isEmpty || chopped.getParent == null
- }
-
- if (columns.isEmpty) {
- None
- } else {
- val (columnNames, values) = columns.reverse.unzip
- Some(PartitionValues(columnNames, values))
- }
- }
-
- private def parsePartitionColumn(
- columnSpec: String,
- defaultPartitionName: String,
- typeInference: Boolean): Option[(String, Literal)] = {
- val equalSignIndex = columnSpec.indexOf('=')
- if (equalSignIndex == -1) {
- None
- } else {
- val columnName = columnSpec.take(equalSignIndex)
- assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'")
-
- val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
- assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
-
- val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference)
- Some(columnName -> literal)
- }
- }
-
- /**
- * Resolves possible type conflicts between partitions by up-casting "lower" types. The up-
- * casting order is:
- * {{{
- * NullType ->
- * IntegerType -> LongType ->
- * DoubleType -> DecimalType.Unlimited ->
- * StringType
- * }}}
- */
- private[sql] def resolvePartitions(
- pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
- if (pathsWithPartitionValues.isEmpty) {
- Seq.empty
- } else {
- val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct
- assert(
- distinctPartColNames.size == 1,
- listConflictingPartitionColumns(pathsWithPartitionValues))
-
- // Resolves possible type conflicts for each column
- val values = pathsWithPartitionValues.map(_._2)
- val columnCount = values.head.columnNames.size
- val resolvedValues = (0 until columnCount).map { i =>
- resolveTypeConflicts(values.map(_.literals(i)))
- }
-
- // Fills resolved literals back to each partition
- values.zipWithIndex.map { case (d, index) =>
- d.copy(literals = resolvedValues.map(_(index)))
- }
- }
- }
-
- private[sql] def listConflictingPartitionColumns(
- pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
- val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct
-
- def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
- seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })
-
- val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
- case (path, partValues) => partValues.columnNames -> path
- })
-
- val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map {
- case (names, index) =>
- s"Partition column name list #$index: $names"
- }
-
- // Lists out those non-leaf partition directories that also contain files
- val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
-
- s"Conflicting partition column names detected:\n" +
- distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
- "For partitioned table directories, data files should only live in leaf directories.\n" +
- "And directories at the same level should have the same partition column name.\n" +
- "Please check the following directories for unexpected files or " +
- "inconsistent partition column names:\n" +
- suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
- }
-
- /**
- * Converts a string to a [[Literal]] with automatic type inference. Currently only supports
- * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and
- * [[StringType]].
- */
- private[sql] def inferPartitionColumnValue(
- raw: String,
- defaultPartitionName: String,
- typeInference: Boolean): Literal = {
- if (typeInference) {
- // First tries integral types
- Try(Literal.create(Integer.parseInt(raw), IntegerType))
- .orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
- // Then falls back to fractional types
- .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
- .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited)))
- // Then falls back to string
- .getOrElse {
- if (raw == defaultPartitionName) {
- Literal.create(null, NullType)
- } else {
- Literal.create(unescapePathName(raw), StringType)
- }
- }
- } else {
- if (raw == defaultPartitionName) {
- Literal.create(null, NullType)
- } else {
- Literal.create(unescapePathName(raw), StringType)
- }
- }
- }
-
- private val upCastingOrder: Seq[DataType] =
- Seq(NullType, IntegerType, LongType, FloatType, DoubleType, DecimalType.Unlimited, StringType)
-
- /**
- * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
- * types.
- */
- private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
- val desiredType = {
- val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
- // Falls back to string if all values of this column are null or empty string
- if (topType == NullType) StringType else topType
- }
-
- literals.map { case l @ Literal(_, dataType) =>
- Literal.create(Cast(l, desiredType).eval(), desiredType)
- }
- }
-
- //////////////////////////////////////////////////////////////////////////////////////////////////
- // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
- //////////////////////////////////////////////////////////////////////////////////////////////////
-
- val charToEscape = {
- val bitSet = new java.util.BitSet(128)
-
- /**
- * ASCII 01-1F are HTTP control characters that need to be escaped.
- * \u000A and \u000D are \n and \r, respectively.
- */
- val clist = Array(
- '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
- '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
- '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
- '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
- '{', '[', ']', '^')
-
- clist.foreach(bitSet.set(_))
-
- if (Shell.WINDOWS) {
- Array(' ', '<', '>', '|').foreach(bitSet.set(_))
- }
-
- bitSet
- }
-
- def needsEscaping(c: Char): Boolean = {
- c >= 0 && c < charToEscape.size() && charToEscape.get(c)
- }
-
- def escapePathName(path: String): String = {
- val builder = new StringBuilder()
- path.foreach { c =>
- if (needsEscaping(c)) {
- builder.append('%')
- builder.append(f"${c.asInstanceOf[Int]}%02x")
- } else {
- builder.append(c)
- }
- }
-
- builder.toString()
- }
-
- def unescapePathName(path: String): String = {
- val sb = new StringBuilder
- var i = 0
-
- while (i < path.length) {
- val c = path.charAt(i)
- if (c == '%' && i + 2 < path.length) {
- val code: Int = try {
- Integer.valueOf(path.substring(i + 1, i + 3), 16)
- } catch { case e: Exception =>
- -1: Integer
- }
- if (code >= 0) {
- sb.append(code.asInstanceOf[Char])
- i += 3
- } else {
- sb.append(c)
- i += 1
- }
- } else {
- sb.append(c)
- i += 1
- }
- }
-
- sb.toString()
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
deleted file mode 100644
index 2bdc341..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
-import org.apache.spark.broadcast.Broadcast
-
-import org.apache.spark.{Partition => SparkPartition, _}
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.rdd.{RDD, HadoopRDD}
-import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-
-import scala.reflect.ClassTag
-
-private[spark] class SqlNewHadoopPartition(
- rddId: Int,
- val index: Int,
- @transient rawSplit: InputSplit with Writable)
- extends SparkPartition {
-
- val serializableHadoopSplit = new SerializableWritable(rawSplit)
-
- override def hashCode(): Int = 41 * (41 + rddId) + index
-}
-
-/**
- * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
- * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
- * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three additions.
- * 1. A shared broadcast Hadoop Configuration.
- * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations at the driver side
- * to the shared Hadoop Configuration.
- * 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side
- * and the executor side to the shared Hadoop Configuration.
- *
- * Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with
- * changes based on [[org.apache.spark.rdd.HadoopRDD]]. In future, this functionality will be
- * folded into core.
- */
-private[sql] class SqlNewHadoopRDD[K, V](
- @transient sc : SparkContext,
- broadcastedConf: Broadcast[SerializableConfiguration],
- @transient initDriverSideJobFuncOpt: Option[Job => Unit],
- initLocalJobFuncOpt: Option[Job => Unit],
- inputFormatClass: Class[_ <: InputFormat[K, V]],
- keyClass: Class[K],
- valueClass: Class[V])
- extends RDD[(K, V)](sc, Nil)
- with SparkHadoopMapReduceUtil
- with Logging {
-
- protected def getJob(): Job = {
- val conf: Configuration = broadcastedConf.value.value
- // "new Job" will make a copy of the conf. Then, it is
- // safe to mutate conf properties with initLocalJobFuncOpt
- // and initDriverSideJobFuncOpt.
- val newJob = new Job(conf)
- initLocalJobFuncOpt.map(f => f(newJob))
- newJob
- }
-
- def getConf(isDriverSide: Boolean): Configuration = {
- val job = getJob()
- if (isDriverSide) {
- initDriverSideJobFuncOpt.map(f => f(job))
- }
- job.getConfiguration
- }
-
- private val jobTrackerId: String = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- formatter.format(new Date())
- }
-
- @transient protected val jobId = new JobID(jobTrackerId, id)
-
- override def getPartitions: Array[SparkPartition] = {
- val conf = getConf(isDriverSide = true)
- val inputFormat = inputFormatClass.newInstance
- inputFormat match {
- case configurable: Configurable =>
- configurable.setConf(conf)
- case _ =>
- }
- val jobContext = newJobContext(conf, jobId)
- val rawSplits = inputFormat.getSplits(jobContext).toArray
- val result = new Array[SparkPartition](rawSplits.size)
- for (i <- 0 until rawSplits.size) {
- result(i) =
- new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
- }
- result
- }
-
- override def compute(
- theSplit: SparkPartition,
- context: TaskContext): InterruptibleIterator[(K, V)] = {
- val iter = new Iterator[(K, V)] {
- val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
- logInfo("Input split: " + split.serializableHadoopSplit)
- val conf = getConf(isDriverSide = false)
-
- val inputMetrics = context.taskMetrics
- .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
-
- // Find a function that will return the FileSystem bytes read by this thread. Do this before
- // creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
- split.serializableHadoopSplit.value match {
- case _: FileSplit | _: CombineFileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
- case _ => None
- }
- }
- inputMetrics.setBytesReadCallback(bytesReadCallback)
-
- val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
- val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
- val format = inputFormatClass.newInstance
- format match {
- case configurable: Configurable =>
- configurable.setConf(conf)
- case _ =>
- }
- val reader = format.createRecordReader(
- split.serializableHadoopSplit.value, hadoopAttemptContext)
- reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
-
- // Register an on-task-completion callback to close the input stream.
- context.addTaskCompletionListener(context => close())
- var havePair = false
- var finished = false
- var recordsSinceMetricsUpdate = 0
-
- override def hasNext: Boolean = {
- if (!finished && !havePair) {
- finished = !reader.nextKeyValue
- havePair = !finished
- }
- !finished
- }
-
- override def next(): (K, V) = {
- if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
- }
- havePair = false
- if (!finished) {
- inputMetrics.incRecordsRead(1)
- }
- (reader.getCurrentKey, reader.getCurrentValue)
- }
-
- private def close() {
- try {
- reader.close()
- if (bytesReadCallback.isDefined) {
- inputMetrics.updateBytesRead()
- } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
- split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
- // If we can't get the bytes read from the FS stats, fall back to the split size,
- // which may be inaccurate.
- try {
- inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
- } catch {
- case e: java.io.IOException =>
- logWarning("Unable to get input size to set InputMetrics for task", e)
- }
- }
- } catch {
- case e: Exception => {
- if (!Utils.inShutdown()) {
- logWarning("Exception in RecordReader.close()", e)
- }
- }
- }
- }
- }
- new InterruptibleIterator(context, iter)
- }
-
- /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
- @DeveloperApi
- def mapPartitionsWithInputSplit[U: ClassTag](
- f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U] = {
- new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
- }
-
- override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = {
- val split = hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value
- val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
- case Some(c) =>
- try {
- val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
- Some(HadoopRDD.convertSplitLocationInfo(infos))
- } catch {
- case e : Exception =>
- logDebug("Failed to use InputSplit#getLocationInfo.", e)
- None
- }
- case None => None
- }
- locs.getOrElse(split.getLocations.filter(_ != "localhost"))
- }
-
- override def persist(storageLevel: StorageLevel): this.type = {
- if (storageLevel.deserialized) {
- logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" +
- " behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
- " Use a map transformation to make copies of the records.")
- }
- super.persist(storageLevel)
- }
-}
-
-private[spark] object SqlNewHadoopRDD {
- /**
- * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
- * the given function rather than the index of the partition.
- */
- private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
- prev: RDD[T],
- f: (InputSplit, Iterator[T]) => Iterator[U],
- preservesPartitioning: Boolean = false)
- extends RDD[U](prev) {
-
- override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
-
- override def getPartitions: Array[SparkPartition] = firstParent[T].partitions
-
- override def compute(split: SparkPartition, context: TaskContext): Iterator[U] = {
- val partition = split.asInstanceOf[SqlNewHadoopPartition]
- val inputSplit = partition.serializableHadoopSplit.value
- f(inputSplit, firstParent[T].iterator(split, context))
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org