You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/08 00:15:16 UTC

[3/4] spark git commit: [SPARK-13665][SQL] Separate the concerns of HadoopFsRelation

http://git-wip-us.apache.org/repos/asf/spark/blob/e720dda4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 8b773dd..0937a21 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 
-private[json] object InferSchema {
+private[sql] object InferSchema {
 
   /**
    * Infer the type of a collection of json records in three stages:

http://git-wip-us.apache.org/repos/asf/spark/blob/e720dda4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 2eba52f..497e3c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -38,101 +38,76 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.collection.BitSet
 
-
-class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister {
+class DefaultSource extends FileFormat with DataSourceRegister {
 
   override def shortName(): String = "json"
 
-  override def createRelation(
+  override def inferSchema(
       sqlContext: SQLContext,
-      paths: Array[String],
-      dataSchema: Option[StructType],
-      partitionColumns: Option[StructType],
-      bucketSpec: Option[BucketSpec],
-      parameters: Map[String, String]): HadoopFsRelation = {
-
-    new JSONRelation(
-      inputRDD = None,
-      maybeDataSchema = dataSchema,
-      maybePartitionSpec = None,
-      userDefinedPartitionColumns = partitionColumns,
-      maybeBucketSpec = bucketSpec,
-      paths = paths,
-      parameters = parameters)(sqlContext)
-  }
-}
-
-private[sql] class JSONRelation(
-    val inputRDD: Option[RDD[String]],
-    val maybeDataSchema: Option[StructType],
-    val maybePartitionSpec: Option[PartitionSpec],
-    override val userDefinedPartitionColumns: Option[StructType],
-    override val maybeBucketSpec: Option[BucketSpec] = None,
-    override val paths: Array[String] = Array.empty[String],
-    parameters: Map[String, String] = Map.empty[String, String])
-    (@transient val sqlContext: SQLContext)
-  extends HadoopFsRelation(maybePartitionSpec, parameters) {
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    if (files.isEmpty) {
+      None
+    } else {
+      val parsedOptions: JSONOptions = new JSONOptions(options)
+      val jsonFiles = files.filterNot { status =>
+        val name = status.getPath.getName
+        name.startsWith("_") || name.startsWith(".")
+      }.toArray
 
-  val options: JSONOptions = new JSONOptions(parameters)
+      val jsonSchema = InferSchema.infer(
+        createBaseRdd(sqlContext, jsonFiles),
+        sqlContext.conf.columnNameOfCorruptRecord,
+        parsedOptions)
+      checkConstraints(jsonSchema)
 
-  /** Constraints to be imposed on schema to be stored. */
-  private def checkConstraints(schema: StructType): Unit = {
-    if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
-      val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
-        case (x, ys) if ys.length > 1 => "\"" + x + "\""
-      }.mkString(", ")
-      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
-        s"cannot save to JSON format")
+      Some(jsonSchema)
     }
   }
 
-  override val needConversion: Boolean = false
-
-  private def createBaseRdd(inputPaths: Array[FileStatus]): RDD[String] = {
-    val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+  override def prepareWrite(
+      sqlContext: SQLContext,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
     val conf = job.getConfiguration
-
-    val paths = inputPaths.map(_.getPath)
-
-    if (paths.nonEmpty) {
-      FileInputFormat.setInputPaths(job, paths: _*)
+    val parsedOptions: JSONOptions = new JSONOptions(options)
+    parsedOptions.compressionCodec.foreach { codec =>
+      CompressionCodecs.setCodecConfiguration(conf, codec)
     }
 
-    sqlContext.sparkContext.hadoopRDD(
-      conf.asInstanceOf[JobConf],
-      classOf[TextInputFormat],
-      classOf[LongWritable],
-      classOf[Text]).map(_._2.toString) // get the text line
-  }
-
-  override lazy val dataSchema: StructType = {
-    val jsonSchema = maybeDataSchema.getOrElse {
-      val files = cachedLeafStatuses().filterNot { status =>
-        val name = status.getPath.getName
-        name.startsWith("_") || name.startsWith(".")
-      }.toArray
-      InferSchema.infer(
-        inputRDD.getOrElse(createBaseRdd(files)),
-        sqlContext.conf.columnNameOfCorruptRecord,
-        options)
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String,
+          bucketId: Option[Int],
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        new JsonOutputWriter(path, bucketId, dataSchema, context)
+      }
     }
-    checkConstraints(jsonSchema)
-
-    jsonSchema
   }
 
-  override private[sql] def buildInternalScan(
+  override def buildInternalScan(
+      sqlContext: SQLContext,
+      dataSchema: StructType,
       requiredColumns: Array[String],
       filters: Array[Filter],
-      inputPaths: Array[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
+      bucketSet: Option[BitSet],
+      inputFiles: Array[FileStatus],
+      broadcastedConf: Broadcast[SerializableConfiguration],
+      options: Map[String, String]): RDD[InternalRow] = {
+    // TODO: Filter files for all formats before calling buildInternalScan.
+    val jsonFiles = inputFiles.filterNot(_.getPath.getName startsWith "_")
+
+    val parsedOptions: JSONOptions = new JSONOptions(options)
     val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_)))
     val rows = JacksonParser.parse(
-      inputRDD.getOrElse(createBaseRdd(inputPaths)),
+      createBaseRdd(sqlContext, jsonFiles),
       requiredDataSchema,
       sqlContext.conf.columnNameOfCorruptRecord,
-      options)
+      parsedOptions)
 
     rows.mapPartitions { iterator =>
       val unsafeProjection = UnsafeProjection.create(requiredDataSchema)
@@ -140,43 +115,36 @@ private[sql] class JSONRelation(
     }
   }
 
-  override def equals(other: Any): Boolean = other match {
-    case that: JSONRelation =>
-      ((inputRDD, that.inputRDD) match {
-        case (Some(thizRdd), Some(thatRdd)) => thizRdd eq thatRdd
-        case (None, None) => true
-        case _ => false
-      }) && paths.toSet == that.paths.toSet &&
-        dataSchema == that.dataSchema &&
-        schema == that.schema
-    case _ => false
-  }
+  private def createBaseRdd(sqlContext: SQLContext, inputPaths: Array[FileStatus]): RDD[String] = {
+    val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+    val conf = job.getConfiguration
 
-  override def hashCode(): Int = {
-    Objects.hashCode(
-      inputRDD,
-      paths.toSet,
-      dataSchema,
-      schema,
-      partitionColumns)
-  }
+    val paths = inputPaths.map(_.getPath)
 
-  override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
-    val conf = job.getConfiguration
-    options.compressionCodec.foreach { codec =>
-      CompressionCodecs.setCodecConfiguration(conf, codec)
+    if (paths.nonEmpty) {
+      FileInputFormat.setInputPaths(job, paths: _*)
     }
 
-    new BucketedOutputWriterFactory {
-      override def newInstance(
-          path: String,
-          bucketId: Option[Int],
-          dataSchema: StructType,
-          context: TaskAttemptContext): OutputWriter = {
-        new JsonOutputWriter(path, bucketId, dataSchema, context)
-      }
+    sqlContext.sparkContext.hadoopRDD(
+      conf.asInstanceOf[JobConf],
+      classOf[TextInputFormat],
+      classOf[LongWritable],
+      classOf[Text]).map(_._2.toString) // get the text line
+  }
+
+  /** Constraints to be imposed on schema to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
+    if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
+      val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
+        case (x, ys) if ys.length > 1 => "\"" + x + "\""
+      }.mkString(", ")
+      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
+          s"cannot save to JSON format")
     }
   }
+
+  override def toString: String = "JSON"
+  override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
 }
 
 private[json] class JsonOutputWriter(

http://git-wip-us.apache.org/repos/asf/spark/blob/e720dda4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index b8af832..82404b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.{Failure, Try}
 
-import com.google.common.base.Objects
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.Writable
@@ -51,193 +50,23 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.collection.BitSet
 
-private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister {
 
-  override def shortName(): String = "parquet"
-
-  override def createRelation(
-      sqlContext: SQLContext,
-      paths: Array[String],
-      schema: Option[StructType],
-      partitionColumns: Option[StructType],
-      bucketSpec: Option[BucketSpec],
-      parameters: Map[String, String]): HadoopFsRelation = {
-    new ParquetRelation(paths, schema, None, partitionColumns, bucketSpec, parameters)(sqlContext)
-  }
-}
-
-// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
-private[sql] class ParquetOutputWriter(
-    path: String,
-    bucketId: Option[Int],
-    context: TaskAttemptContext)
-  extends OutputWriter {
-
-  private val recordWriter: RecordWriter[Void, InternalRow] = {
-    val outputFormat = {
-      new ParquetOutputFormat[InternalRow]() {
-        // Here we override `getDefaultWorkFile` for two reasons:
-        //
-        //  1. To allow appending.  We need to generate unique output file names to avoid
-        //     overwriting existing files (either exist before the write job, or are just written
-        //     by other tasks within the same write job).
-        //
-        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` uses
-        //     `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
-        //     partitions in the case of dynamic partitioning.
-        override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-          val configuration = context.getConfiguration
-          val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
-          val taskAttemptId = context.getTaskAttemptID
-          val split = taskAttemptId.getTaskID.getId
-          val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
-          new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
-        }
-      }
-    }
-
-    outputFormat.getRecordWriter(context)
-  }
+private[sql] class DefaultSource extends FileFormat with DataSourceRegister with Logging {
 
-  override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
-
-  override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)
-
-  override def close(): Unit = recordWriter.close(context)
-}
-
-private[sql] class ParquetRelation(
-    override val paths: Array[String],
-    private val maybeDataSchema: Option[StructType],
-    // This is for metastore conversion.
-    private val maybePartitionSpec: Option[PartitionSpec],
-    override val userDefinedPartitionColumns: Option[StructType],
-    override val maybeBucketSpec: Option[BucketSpec],
-    parameters: Map[String, String])(
-    val sqlContext: SQLContext)
-  extends HadoopFsRelation(maybePartitionSpec, parameters)
-  with Logging {
-
-  private[sql] def this(
-      paths: Array[String],
-      maybeDataSchema: Option[StructType],
-      maybePartitionSpec: Option[PartitionSpec],
-      parameters: Map[String, String])(
-      sqlContext: SQLContext) = {
-    this(
-      paths,
-      maybeDataSchema,
-      maybePartitionSpec,
-      maybePartitionSpec.map(_.partitionColumns),
-      None,
-      parameters)(sqlContext)
-  }
-
-  // Should we merge schemas from all Parquet part-files?
-  private val shouldMergeSchemas =
-    parameters
-      .get(ParquetRelation.MERGE_SCHEMA)
-      .map(_.toBoolean)
-      .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
-
-  private val mergeRespectSummaries =
-    sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
-
-  private val maybeMetastoreSchema = parameters
-    .get(ParquetRelation.METASTORE_SCHEMA)
-    .map(DataType.fromJson(_).asInstanceOf[StructType])
-
-  private val compressionCodec: Option[String] = parameters
-    .get("compression")
-    .map { codecName =>
-      // Validate if given compression codec is supported or not.
-      val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames
-      if (!shortParquetCompressionCodecNames.contains(codecName.toLowerCase)) {
-        val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase)
-        throw new IllegalArgumentException(s"Codec [$codecName] " +
-          s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
-      }
-      codecName.toLowerCase
-    }
-
-  private lazy val metadataCache: MetadataCache = {
-    val meta = new MetadataCache
-    meta.refresh()
-    meta
-  }
-
-  override def toString: String = {
-    parameters.get(ParquetRelation.METASTORE_TABLE_NAME).map { tableName =>
-      s"${getClass.getSimpleName}: $tableName"
-    }.getOrElse(super.toString)
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: ParquetRelation =>
-      val schemaEquality = if (shouldMergeSchemas) {
-        this.shouldMergeSchemas == that.shouldMergeSchemas
-      } else {
-        this.dataSchema == that.dataSchema &&
-          this.schema == that.schema
-      }
-
-      this.paths.toSet == that.paths.toSet &&
-        schemaEquality &&
-        this.maybeDataSchema == that.maybeDataSchema &&
-        this.partitionColumns == that.partitionColumns
-
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    if (shouldMergeSchemas) {
-      Objects.hashCode(
-        Boolean.box(shouldMergeSchemas),
-        paths.toSet,
-        maybeDataSchema,
-        partitionColumns)
-    } else {
-      Objects.hashCode(
-        Boolean.box(shouldMergeSchemas),
-        paths.toSet,
-        dataSchema,
-        schema,
-        maybeDataSchema,
-        partitionColumns)
-    }
-  }
-
-  /** Constraints on schema of dataframe to be stored. */
-  private def checkConstraints(schema: StructType): Unit = {
-    if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
-      val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
-        case (x, ys) if ys.length > 1 => "\"" + x + "\""
-      }.mkString(", ")
-      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
-        s"cannot save to parquet format")
-    }
-  }
+  override def shortName(): String = "parquet"
 
-  override def dataSchema: StructType = {
-    val schema = maybeDataSchema.getOrElse(metadataCache.dataSchema)
-    // check if schema satisfies the constraints
-    // before moving forward
-    checkConstraints(schema)
-    schema
-  }
+  override def toString: String = "ParquetFormat"
 
-  override private[sql] def refresh(): Unit = {
-    super.refresh()
-    metadataCache.refresh()
-  }
+  override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource]
 
-  // Parquet data source always uses Catalyst internal representations.
-  override val needConversion: Boolean = false
-
-  override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
+  override def prepareWrite(
+      sqlContext: SQLContext,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
 
-  override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
     val conf = ContextUtil.getConfiguration(job)
 
     // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
@@ -255,11 +84,24 @@ private[sql] class ParquetRelation(
 
     if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
       logInfo("Using default output committer for Parquet: " +
-        classOf[ParquetOutputCommitter].getCanonicalName)
+          classOf[ParquetOutputCommitter].getCanonicalName)
     } else {
       logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
     }
 
+    val compressionCodec: Option[String] = options
+      .get("compression")
+      .map { codecName =>
+        // Validate if given compression codec is supported or not.
+        val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames
+        if (!shortParquetCompressionCodecNames.contains(codecName.toLowerCase)) {
+          val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase)
+          throw new IllegalArgumentException(s"Codec [$codecName] " +
+              s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
+        }
+        codecName.toLowerCase
+      }
+
     conf.setClass(
       SQLConf.OUTPUT_COMMITTER_CLASS.key,
       committerClass,
@@ -303,7 +145,7 @@ private[sql] class ParquetRelation(
             .getOrElse(sqlContext.conf.parquetCompressionCodec.toLowerCase),
           CompressionCodecName.UNCOMPRESSED).name())
 
-    new BucketedOutputWriterFactory {
+    new OutputWriterFactory {
       override def newInstance(
           path: String,
           bucketId: Option[Int],
@@ -314,11 +156,127 @@ private[sql] class ParquetRelation(
     }
   }
 
+  def inferSchema(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    // Should we merge schemas from all Parquet part-files?
+    val shouldMergeSchemas =
+      parameters
+          .get(ParquetRelation.MERGE_SCHEMA)
+          .map(_.toBoolean)
+          .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
+
+    val mergeRespectSummaries =
+      sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
+
+    val filesByType = splitFiles(files)
+
+    // Sees which file(s) we need to touch in order to figure out the schema.
+    //
+    // Always tries the summary files first if users don't require a merged schema.  In this case,
+    // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
+    // groups information, and could be much smaller for large Parquet files with lots of row
+    // groups.  If no summary file is available, falls back to some random part-file.
+    //
+    // NOTE: Metadata stored in the summary files are merged from all part-files.  However, for
+    // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
+    // how to merge them correctly if some key is associated with different values in different
+    // part-files.  When this happens, Parquet simply gives up generating the summary file.  This
+    // implies that if a summary file presents, then:
+    //
+    //   1. Either all part-files have exactly the same Spark SQL schema, or
+    //   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
+    //      their schemas may differ from each other).
+    //
+    // Here we tend to be pessimistic and take the second case into account.  Basically this means
+    // we can't trust the summary files if users require a merged schema, and must touch all part-
+    // files to do the merge.
+    val filesToTouch =
+      if (shouldMergeSchemas) {
+        // Also includes summary files, 'cause there might be empty partition directories.
+
+        // If mergeRespectSummaries config is true, we assume that all part-files are the same for
+        // their schema with summary files, so we ignore them when merging schema.
+        // If the config is disabled, which is the default setting, we merge all part-files.
+        // In this mode, we only need to merge schemas contained in all those summary files.
+        // You should enable this configuration only if you are very sure that for the parquet
+        // part-files to read there are corresponding summary files containing correct schema.
+
+        // As filed in SPARK-11500, the order of files to touch is a matter, which might affect
+        // the ordering of the output columns. There are several things to mention here.
+        //
+        //  1. If mergeRespectSummaries config is false, then it merges schemas by reducing from
+        //     the first part-file so that the columns of the lexicographically first file show
+        //     first.
+        //
+        //  2. If mergeRespectSummaries config is true, then there should be, at least,
+        //     "_metadata"s for all given files, so that we can ensure the columns of
+        //     the lexicographically first file show first.
+        //
+        //  3. If shouldMergeSchemas is false, but when multiple files are given, there is
+        //     no guarantee of the output order, since there might not be a summary file for the
+        //     lexicographically first file, which ends up putting ahead the columns of
+        //     the other files. However, this should be okay since not enabling
+        //     shouldMergeSchemas means (assumes) all the files have the same schemas.
+
+        val needMerged: Seq[FileStatus] =
+          if (mergeRespectSummaries) {
+            Seq()
+          } else {
+            filesByType.data
+          }
+        needMerged ++ filesByType.metadata ++ filesByType.commonMetadata
+      } else {
+        // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
+        // don't have this.
+        filesByType.commonMetadata.headOption
+            // Falls back to "_metadata"
+            .orElse(filesByType.metadata.headOption)
+            // Summary file(s) not found, the Parquet file is either corrupted, or different part-
+            // files contain conflicting user defined metadata (two or more values are associated
+            // with a same key in different files).  In either case, we fall back to any of the
+            // first part-file, and just assume all schemas are consistent.
+            .orElse(filesByType.data.headOption)
+            .toSeq
+      }
+    ParquetRelation.mergeSchemasInParallel(filesToTouch, sqlContext)
+  }
+
+  case class FileTypes(
+      data: Seq[FileStatus],
+      metadata: Seq[FileStatus],
+      commonMetadata: Seq[FileStatus])
+
+  private def splitFiles(allFiles: Seq[FileStatus]): FileTypes = {
+    // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
+    val leaves = allFiles.filter { f =>
+      isSummaryFile(f.getPath) ||
+          !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+    }.toArray.sortBy(_.getPath.toString)
+
+    FileTypes(
+      data = leaves.filterNot(f => isSummaryFile(f.getPath)),
+      metadata =
+        leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE),
+      commonMetadata =
+        leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))
+  }
+
+  private def isSummaryFile(file: Path): Boolean = {
+    file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
+        file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
+  }
+
   override def buildInternalScan(
+      sqlContext: SQLContext,
+      dataSchema: StructType,
       requiredColumns: Array[String],
       filters: Array[Filter],
-      inputFiles: Array[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
+      bucketSet: Option[BitSet],
+      allFiles: Array[FileStatus],
+      broadcastedConf: Broadcast[SerializableConfiguration],
+      options: Map[String, String]): RDD[InternalRow] = {
     val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
     val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
     val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
@@ -341,6 +299,8 @@ private[sql] class ParquetRelation(
         assumeBinaryIsString,
         assumeInt96IsTimestamp) _
 
+    val inputFiles = splitFiles(allFiles).data.toArray
+
     // Create the function to set input paths at the driver side.
     val setInputPaths =
       ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _
@@ -392,153 +352,46 @@ private[sql] class ParquetRelation(
       }
     }
   }
+}
 
-  private class MetadataCache {
-    // `FileStatus` objects of all "_metadata" files.
-    private var metadataStatuses: Array[FileStatus] = _
-
-    // `FileStatus` objects of all "_common_metadata" files.
-    private var commonMetadataStatuses: Array[FileStatus] = _
-
-    // `FileStatus` objects of all data files (Parquet part-files).
-    var dataStatuses: Array[FileStatus] = _
-
-    // Schema of the actual Parquet files, without partition columns discovered from partition
-    // directory paths.
-    var dataSchema: StructType = null
-
-    // Schema of the whole table, including partition columns.
-    var schema: StructType = _
-
-    // Cached leaves
-    var cachedLeaves: mutable.LinkedHashSet[FileStatus] = null
-
-    /**
-     * Refreshes `FileStatus`es, footers, partition spec, and table schema.
-     */
-    def refresh(): Unit = {
-      val currentLeafStatuses = cachedLeafStatuses()
-
-      // Check if cachedLeafStatuses is changed or not
-      val leafStatusesChanged = (cachedLeaves == null) ||
-        !cachedLeaves.equals(currentLeafStatuses)
-
-      if (leafStatusesChanged) {
-        cachedLeaves = currentLeafStatuses
-
-        // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
-        val leaves = currentLeafStatuses.filter { f =>
-          isSummaryFile(f.getPath) ||
-            !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
-        }.toArray.sortBy(_.getPath.toString)
-
-        dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
-        metadataStatuses =
-          leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
-        commonMetadataStatuses =
-          leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
-
-        dataSchema = {
-          val dataSchema0 = maybeDataSchema
-            .orElse(readSchema())
-            .orElse(maybeMetastoreSchema)
-            .getOrElse(throw new AnalysisException(
-              s"Failed to discover schema of Parquet file(s) in the following location(s):\n" +
-                paths.mkString("\n\t")))
-
-          // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
-          // case insensitivity issue and possible schema mismatch (probably caused by schema
-          // evolution).
-          maybeMetastoreSchema
-            .map(ParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
-            .getOrElse(dataSchema0)
+// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
+private[sql] class ParquetOutputWriter(
+    path: String,
+    bucketId: Option[Int],
+    context: TaskAttemptContext)
+  extends OutputWriter {
+
+  private val recordWriter: RecordWriter[Void, InternalRow] = {
+    val outputFormat = {
+      new ParquetOutputFormat[InternalRow]() {
+        // Here we override `getDefaultWorkFile` for two reasons:
+        //
+        //  1. To allow appending.  We need to generate unique output file names to avoid
+        //     overwriting existing files (either exist before the write job, or are just written
+        //     by other tasks within the same write job).
+        //
+        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` uses
+        //     `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
+        //     partitions in the case of dynamic partitioning.
+        override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+          val configuration = context.getConfiguration
+          val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+          val taskAttemptId = context.getTaskAttemptID
+          val split = taskAttemptId.getTaskID.getId
+          val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
+          new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
         }
       }
     }
 
-    private def isSummaryFile(file: Path): Boolean = {
-      file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
-        file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
-    }
+    outputFormat.getRecordWriter(context)
+  }
 
-    private def readSchema(): Option[StructType] = {
-      // Sees which file(s) we need to touch in order to figure out the schema.
-      //
-      // Always tries the summary files first if users don't require a merged schema.  In this case,
-      // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
-      // groups information, and could be much smaller for large Parquet files with lots of row
-      // groups.  If no summary file is available, falls back to some random part-file.
-      //
-      // NOTE: Metadata stored in the summary files are merged from all part-files.  However, for
-      // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
-      // how to merge them correctly if some key is associated with different values in different
-      // part-files.  When this happens, Parquet simply gives up generating the summary file.  This
-      // implies that if a summary file presents, then:
-      //
-      //   1. Either all part-files have exactly the same Spark SQL schema, or
-      //   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
-      //      their schemas may differ from each other).
-      //
-      // Here we tend to be pessimistic and take the second case into account.  Basically this means
-      // we can't trust the summary files if users require a merged schema, and must touch all part-
-      // files to do the merge.
-      val filesToTouch =
-        if (shouldMergeSchemas) {
-          // Also includes summary files, 'cause there might be empty partition directories.
-
-          // If mergeRespectSummaries config is true, we assume that all part-files are the same for
-          // their schema with summary files, so we ignore them when merging schema.
-          // If the config is disabled, which is the default setting, we merge all part-files.
-          // In this mode, we only need to merge schemas contained in all those summary files.
-          // You should enable this configuration only if you are very sure that for the parquet
-          // part-files to read there are corresponding summary files containing correct schema.
-
-          // As filed in SPARK-11500, the order of files to touch is a matter, which might affect
-          // the ordering of the output columns. There are several things to mention here.
-          //
-          //  1. If mergeRespectSummaries config is false, then it merges schemas by reducing from
-          //     the first part-file so that the columns of the lexicographically first file show
-          //     first.
-          //
-          //  2. If mergeRespectSummaries config is true, then there should be, at least,
-          //     "_metadata"s for all given files, so that we can ensure the columns of
-          //     the lexicographically first file show first.
-          //
-          //  3. If shouldMergeSchemas is false, but when multiple files are given, there is
-          //     no guarantee of the output order, since there might not be a summary file for the
-          //     lexicographically first file, which ends up putting ahead the columns of
-          //     the other files. However, this should be okay since not enabling
-          //     shouldMergeSchemas means (assumes) all the files have the same schemas.
-
-          val needMerged: Seq[FileStatus] =
-            if (mergeRespectSummaries) {
-              Seq()
-            } else {
-              dataStatuses
-            }
-          needMerged ++ metadataStatuses ++ commonMetadataStatuses
-        } else {
-          // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
-          // don't have this.
-          commonMetadataStatuses.headOption
-            // Falls back to "_metadata"
-            .orElse(metadataStatuses.headOption)
-            // Summary file(s) not found, the Parquet file is either corrupted, or different part-
-            // files contain conflicting user defined metadata (two or more values are associated
-            // with a same key in different files).  In either case, we fall back to any of the
-            // first part-file, and just assume all schemas are consistent.
-            .orElse(dataStatuses.headOption)
-            .toSeq
-        }
+  override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
 
-      assert(
-        filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
-        "No predefined schema found, " +
-          s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.")
+  override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)
 
-      ParquetRelation.mergeSchemasInParallel(filesToTouch, sqlContext)
-    }
-  }
+  override def close(): Unit = recordWriter.close(context)
 }
 
 private[sql] object ParquetRelation extends Logging {
@@ -699,7 +552,7 @@ private[sql] object ParquetRelation extends Logging {
    * distinguish binary and string).  This method generates a correct schema by merging Metastore
    * schema data types and Parquet schema field names.
    */
-  private[parquet] def mergeMetastoreParquetSchema(
+  private[sql] def mergeMetastoreParquetSchema(
       metastoreSchema: StructType,
       parquetSchema: StructType): StructType = {
     def schemaConflictMessage: String =

http://git-wip-us.apache.org/repos/asf/spark/blob/e720dda4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 2e41e88..0eae346 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -34,6 +34,7 @@ private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[Logica
       try {
         val resolved = ResolvedDataSource(
           sqlContext,
+          paths = Seq.empty,
           userSpecifiedSchema = None,
           partitionColumns = Array(),
           bucketSpec = None,
@@ -130,7 +131,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
         LogicalRelation(r: HadoopFsRelation, _, _), part, query, overwrite, _) =>
         // We need to make sure the partition columns specified by users do match partition
         // columns of the relation.
-        val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
+        val existingPartitionColumns = r.partitionSchema.fieldNames.toSet
         val specifiedPartitionColumns = part.keySet
         if (existingPartitionColumns != specifiedPartitionColumns) {
           failAnalysis(s"Specified partition columns " +

http://git-wip-us.apache.org/repos/asf/spark/blob/e720dda4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 8f3f633..b329725 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -31,25 +31,16 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, PartitionSpec}
+import org.apache.spark.sql.execution.datasources.CompressionCodecs
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.collection.BitSet
 
 /**
  * A data source for reading text files.
  */
-class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
-
-  override def createRelation(
-      sqlContext: SQLContext,
-      paths: Array[String],
-      dataSchema: Option[StructType],
-      partitionColumns: Option[StructType],
-      parameters: Map[String, String]): HadoopFsRelation = {
-    dataSchema.foreach(verifySchema)
-    new TextRelation(None, dataSchema, partitionColumns, paths, parameters)(sqlContext)
-  }
+class DefaultSource extends FileFormat with DataSourceRegister {
 
   override def shortName(): String = "text"
 
@@ -64,58 +55,21 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
         s"Text data source supports only a string column, but you have ${tpe.simpleString}.")
     }
   }
-}
-
-private[sql] class TextRelation(
-    val maybePartitionSpec: Option[PartitionSpec],
-    val textSchema: Option[StructType],
-    override val userDefinedPartitionColumns: Option[StructType],
-    override val paths: Array[String] = Array.empty[String],
-    parameters: Map[String, String] = Map.empty[String, String])
-    (@transient val sqlContext: SQLContext)
-  extends HadoopFsRelation(maybePartitionSpec, parameters) {
 
-  /** Data schema is always a single column, named "value" if original Data source has no schema. */
-  override def dataSchema: StructType =
-    textSchema.getOrElse(new StructType().add("value", StringType))
-  /** This is an internal data source that outputs internal row format. */
-  override val needConversion: Boolean = false
-
-
-  override private[sql] def buildInternalScan(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      inputPaths: Array[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
-    val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
-    val conf = job.getConfiguration
-    val paths = inputPaths.map(_.getPath).sortBy(_.toUri)
-
-    if (paths.nonEmpty) {
-      FileInputFormat.setInputPaths(job, paths: _*)
-    }
+  override def inferSchema(
+      sqlContext: SQLContext,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType))
 
-    sqlContext.sparkContext.hadoopRDD(
-      conf.asInstanceOf[JobConf], classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
-      .mapPartitions { iter =>
-        val unsafeRow = new UnsafeRow(1)
-        val bufferHolder = new BufferHolder(unsafeRow)
-        val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
-
-        iter.map { case (_, line) =>
-          // Writes to an UnsafeRow directly
-          bufferHolder.reset()
-          unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
-          unsafeRow.setTotalSize(bufferHolder.totalSize())
-          unsafeRow
-        }
-      }
-  }
+  override def prepareWrite(
+      sqlContext: SQLContext,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    verifySchema(dataSchema)
 
-  /** Write path. */
-  override def prepareJobForWrite(job: Job): OutputWriterFactory = {
     val conf = job.getConfiguration
-    val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
+    val compressionCodec = options.get("compression").map(CompressionCodecs.getCodecClassName)
     compressionCodec.foreach { codec =>
       CompressionCodecs.setCodecConfiguration(conf, codec)
     }
@@ -123,21 +77,54 @@ private[sql] class TextRelation(
     new OutputWriterFactory {
       override def newInstance(
           path: String,
+          bucketId: Option[Int],
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
+        if (bucketId.isDefined) {
+          throw new AnalysisException("Text doesn't support bucketing")
+        }
         new TextOutputWriter(path, dataSchema, context)
       }
     }
   }
 
-  override def equals(other: Any): Boolean = other match {
-    case that: TextRelation =>
-      paths.toSet == that.paths.toSet && partitionColumns == that.partitionColumns
-    case _ => false
-  }
+  override def buildInternalScan(
+      sqlContext: SQLContext,
+      dataSchema: StructType,
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      bucketSet: Option[BitSet],
+      inputFiles: Array[FileStatus],
+      broadcastedConf: Broadcast[SerializableConfiguration],
+      options: Map[String, String]): RDD[InternalRow] = {
+    verifySchema(dataSchema)
 
-  override def hashCode(): Int = {
-    Objects.hashCode(paths.toSet, partitionColumns)
+    val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+    val conf = job.getConfiguration
+    val paths = inputFiles
+        .filterNot(_.getPath.getName startsWith "_")
+        .map(_.getPath)
+        .sortBy(_.toUri)
+
+    if (paths.nonEmpty) {
+      FileInputFormat.setInputPaths(job, paths: _*)
+    }
+
+    sqlContext.sparkContext.hadoopRDD(
+      conf.asInstanceOf[JobConf], classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
+        .mapPartitions { iter =>
+          val unsafeRow = new UnsafeRow(1)
+          val bufferHolder = new BufferHolder(unsafeRow)
+          val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
+
+          iter.map { case (_, line) =>
+            // Writes to an UnsafeRow directly
+            bufferHolder.reset()
+            unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
+            unsafeRow.setTotalSize(bufferHolder.totalSize())
+            unsafeRow
+          }
+        }
   }
 }
 
@@ -170,3 +157,4 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp
     recordWriter.close(context)
   }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/e720dda4/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index f5f3654..6f81794 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, ResolveDataSource}
+import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource}
 import org.apache.spark.sql.execution.exchange.EnsureRequirements
 import org.apache.spark.sql.util.ExecutionListenerManager
 
@@ -63,8 +63,9 @@ private[sql] class SessionState(ctx: SQLContext) {
     new Analyzer(catalog, functionRegistry, conf) {
       override val extendedResolutionRules =
         python.ExtractPythonUDFs ::
-          PreInsertCastAndRename ::
-          (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
+        PreInsertCastAndRename ::
+        DataSourceAnalysis ::
+        (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
 
       override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/e720dda4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 87ea7f5..12512a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -28,12 +28,11 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.{RDD, UnionRDD}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
-import org.apache.spark.sql.execution.{FileRelation, RDDConversions}
+import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
 import org.apache.spark.sql.types.{StringType, StructType}
@@ -147,84 +146,6 @@ trait StreamSinkProvider {
 }
 
 /**
- * ::Experimental::
- * Implemented by objects that produce relations for a specific kind of data source
- * with a given schema and partitioned columns.  When Spark SQL is given a DDL operation with a
- * USING clause specified (to specify the implemented [[HadoopFsRelationProvider]]), a user defined
- * schema, and an optional list of partition columns, this interface is used to pass in the
- * parameters specified by a user.
- *
- * Users may specify the fully qualified class name of a given data source.  When that class is
- * not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
- * less verbose invocation.  For example, 'org.apache.spark.sql.json' would resolve to the
- * data source 'org.apache.spark.sql.json.DefaultSource'
- *
- * A new instance of this class will be instantiated each time a DDL call is made.
- *
- * The difference between a [[RelationProvider]] and a [[HadoopFsRelationProvider]] is
- * that users need to provide a schema and a (possibly empty) list of partition columns when
- * using a [[HadoopFsRelationProvider]]. A relation provider can inherits both [[RelationProvider]],
- * and [[HadoopFsRelationProvider]] if it can support schema inference, user-specified
- * schemas, and accessing partitioned relations.
- *
- * @since 1.4.0
- */
-@Experimental
-trait HadoopFsRelationProvider extends StreamSourceProvider {
-  /**
-   * Returns a new base relation with the given parameters, a user defined schema, and a list of
-   * partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
-   * is enforced by the Map that is passed to the function.
-   *
-   * @param dataSchema Schema of data columns (i.e., columns that are not partition columns).
-   */
-  def createRelation(
-      sqlContext: SQLContext,
-      paths: Array[String],
-      dataSchema: Option[StructType],
-      partitionColumns: Option[StructType],
-      parameters: Map[String, String]): HadoopFsRelation
-
-  private[sql] def createRelation(
-      sqlContext: SQLContext,
-      paths: Array[String],
-      dataSchema: Option[StructType],
-      partitionColumns: Option[StructType],
-      bucketSpec: Option[BucketSpec],
-      parameters: Map[String, String]): HadoopFsRelation = {
-    if (bucketSpec.isDefined) {
-      throw new AnalysisException("Currently we don't support bucketing for this data source.")
-    }
-    createRelation(sqlContext, paths, dataSchema, partitionColumns, parameters)
-  }
-
-  override def createSource(
-      sqlContext: SQLContext,
-      schema: Option[StructType],
-      providerName: String,
-      parameters: Map[String, String]): Source = {
-    val caseInsensitiveOptions = new CaseInsensitiveMap(parameters)
-    val path = caseInsensitiveOptions.getOrElse("path", {
-      throw new IllegalArgumentException("'path' is not specified")
-    })
-    val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata")
-
-    def dataFrameBuilder(files: Array[String]): DataFrame = {
-      val relation = createRelation(
-        sqlContext,
-        files,
-        schema,
-        partitionColumns = None,
-        bucketSpec = None,
-        parameters)
-      DataFrame(sqlContext, LogicalRelation(relation))
-    }
-
-    new FileStreamSource(sqlContext, metadataPath, path, schema, providerName, dataFrameBuilder)
-  }
-}
-
-/**
  * @since 1.3.0
  */
 @DeveloperApi
@@ -409,20 +330,13 @@ abstract class OutputWriterFactory extends Serializable {
    * @param dataSchema Schema of the rows to be written. Partition columns are not included in the
    *        schema if the relation being written is partitioned.
    * @param context The Hadoop MapReduce task context.
-   *
    * @since 1.4.0
    */
-  def newInstance(
-      path: String,
-      dataSchema: StructType,
-      context: TaskAttemptContext): OutputWriter
-
   private[sql] def newInstance(
       path: String,
-      bucketId: Option[Int],
+      bucketId: Option[Int], // TODO: This doesn't belong here...
       dataSchema: StructType,
-      context: TaskAttemptContext): OutputWriter =
-    newInstance(path, dataSchema, context)
+      context: TaskAttemptContext): OutputWriter
 }
 
 /**
@@ -465,214 +379,165 @@ abstract class OutputWriter {
 }
 
 /**
- * ::Experimental::
- * A [[BaseRelation]] that provides much of the common code required for relations that store their
- * data to an HDFS compatible filesystem.
- *
- * For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and
- * filter using selected predicates before producing an RDD containing all matching tuples as
- * [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file
- * systems, it's able to discover partitioning information from the paths of input directories, and
- * perform partition pruning before start reading the data. Subclasses of [[HadoopFsRelation()]]
- * must override one of the four `buildScan` methods to implement the read path.
- *
- * For the write path, it provides the ability to write to both non-partitioned and partitioned
- * tables.  Directory layout of the partitioned tables is compatible with Hive.
- *
- * @constructor This constructor is for internal uses only. The [[PartitionSpec]] argument is for
- *              implementing metastore table conversion.
- *
- * @param maybePartitionSpec An [[HadoopFsRelation]] can be created with an optional
- *        [[PartitionSpec]], so that partition discovery can be skipped.
- *
- * @since 1.4.0
+ * Acts as a container for all of the metadata required to read from a datasource. All discovery,
+ * resolution and merging logic for schemas and partitions has been removed.
+ *
+ * @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise
+ *                 this relation.
+ * @param partitionSchema The schmea of the columns (if any) that are used to partition the relation
+ * @param dataSchema The schema of any remaining columns.  Note that if any partition columns are
+ *                   present in the actual data files as well, they are removed.
+ * @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values).
+ * @param fileFormat A file format that can be used to read and write the data in files.
+ * @param options Configuration used when reading / writing data.
  */
-@Experimental
-abstract class HadoopFsRelation private[sql](
-    maybePartitionSpec: Option[PartitionSpec],
-    parameters: Map[String, String])
-  extends BaseRelation with FileRelation with Logging {
-
-  override def toString: String = getClass.getSimpleName
+case class HadoopFsRelation(
+    sqlContext: SQLContext,
+    location: FileCatalog,
+    partitionSchema: StructType,
+    dataSchema: StructType,
+    bucketSpec: Option[BucketSpec],
+    fileFormat: FileFormat,
+    options: Map[String, String]) extends BaseRelation with FileRelation {
+
+  val schema: StructType = {
+    val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
+    StructType(dataSchema ++ partitionSchema.filterNot { column =>
+      dataSchemaColumnNames.contains(column.name.toLowerCase)
+    })
+  }
 
-  def this() = this(None, Map.empty[String, String])
+  def partitionSchemaOption: Option[StructType] =
+    if (partitionSchema.isEmpty) None else Some(partitionSchema)
+  def partitionSpec: PartitionSpec = location.partitionSpec(partitionSchemaOption)
 
-  def this(parameters: Map[String, String]) = this(None, parameters)
+  def refresh(): Unit = location.refresh()
 
-  private[sql] def this(maybePartitionSpec: Option[PartitionSpec]) =
-    this(maybePartitionSpec, Map.empty[String, String])
+  override def toString: String =
+    s"$fileFormat part: ${partitionSchema.simpleString}, data: ${dataSchema.simpleString}"
 
-  private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+  /** Returns the list of files that will be read when scanning this relation. */
+  override def inputFiles: Array[String] =
+    location.allFiles().map(_.getPath.toUri.toString).toArray
+}
 
-  private var _partitionSpec: PartitionSpec = _
+/**
+ * Used to read a write data in files to [[InternalRow]] format.
+ */
+trait FileFormat {
+  /**
+   * When possible, this method should return the schema of the given `files`.  When the format
+   * does not support inference, or no valid files are given should return None.  In these cases
+   * Spark will require that user specify the schema manually.
+   */
+  def inferSchema(
+      sqlContext: SQLContext,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType]
 
-  private[this] var malformedBucketFile = false
+  /**
+   * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation can
+   * be put here.  For example, user defined output committer can be configured here
+   * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
+   */
+  def prepareWrite(
+      sqlContext: SQLContext,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory
 
-  private[sql] def maybeBucketSpec: Option[BucketSpec] = None
+  def buildInternalScan(
+      sqlContext: SQLContext,
+      dataSchema: StructType,
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      bucketSet: Option[BitSet],
+      inputFiles: Array[FileStatus],
+      broadcastedConf: Broadcast[SerializableConfiguration],
+      options: Map[String, String]): RDD[InternalRow]
+}
 
-  final private[sql] def getBucketSpec: Option[BucketSpec] =
-    maybeBucketSpec.filter(_ => sqlContext.conf.bucketingEnabled() && !malformedBucketFile)
+/**
+ * An interface for objects capable of enumerating the files that comprise a relation as well
+ * as the partitioning characteristics of those files.
+ */
+trait FileCatalog {
+  def paths: Seq[Path]
 
-  private class FileStatusCache {
-    var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
+  def partitionSpec(schema: Option[StructType]): PartitionSpec
 
-    var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
+  def allFiles(): Seq[FileStatus]
 
-    private def listLeafFiles(paths: Array[String]): mutable.LinkedHashSet[FileStatus] = {
-      if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) {
-        HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext)
-      } else {
-        val statuses = paths.flatMap { path =>
-          val hdfsPath = new Path(path)
-          val fs = hdfsPath.getFileSystem(hadoopConf)
-          val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-          logInfo(s"Listing $qualified on driver")
-          // Dummy jobconf to get to the pathFilter defined in configuration
-          val jobConf = new JobConf(hadoopConf, this.getClass())
-          val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
-          if (pathFilter != null) {
-            Try(fs.listStatus(qualified, pathFilter)).getOrElse(Array.empty)
-          } else {
-            Try(fs.listStatus(qualified)).getOrElse(Array.empty)
-          }
-        }.filterNot { status =>
-          val name = status.getPath.getName
-          name.toLowerCase == "_temporary" || name.startsWith(".")
-        }
+  def getStatus(path: Path): Array[FileStatus]
 
-        val (dirs, files) = statuses.partition(_.isDirectory)
+  def refresh(): Unit
+}
 
-        // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
-        if (dirs.isEmpty) {
-          mutable.LinkedHashSet(files: _*)
-        } else {
-          mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath.toString))
-        }
-      }
-    }
+/**
+ * A file catalog that caches metadata gathered by scanning all the files present in `paths`
+ * recursively.
+ */
+class HDFSFileCatalog(
+    val sqlContext: SQLContext,
+    val parameters: Map[String, String],
+    val paths: Seq[Path])
+  extends FileCatalog with Logging {
 
-    def refresh(): Unit = {
-      val files = listLeafFiles(paths)
+  private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
 
-      leafFiles.clear()
-      leafDirToChildrenFiles.clear()
+  var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
+  var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
+  var cachedPartitionSpec: PartitionSpec = _
 
-      leafFiles ++= files.map(f => f.getPath -> f)
-      leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent)
+  def partitionSpec(schema: Option[StructType]): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+      cachedPartitionSpec = inferPartitioning(schema)
     }
-  }
 
-  private lazy val fileStatusCache = {
-    val cache = new FileStatusCache
-    cache.refresh()
-    cache
+    cachedPartitionSpec
   }
 
-  protected def cachedLeafStatuses(): mutable.LinkedHashSet[FileStatus] = {
-    mutable.LinkedHashSet(fileStatusCache.leafFiles.values.toArray: _*)
-  }
+  refresh()
 
-  final private[sql] def partitionSpec: PartitionSpec = {
-    if (_partitionSpec == null) {
-      _partitionSpec = maybePartitionSpec
-        .flatMap {
-          case spec if spec.partitions.nonEmpty =>
-            Some(spec.copy(partitionColumns = spec.partitionColumns.asNullable))
-          case _ =>
-            None
-        }
-        .orElse {
-          // We only know the partition columns and their data types. We need to discover
-          // partition values.
-          userDefinedPartitionColumns.map { partitionSchema =>
-            val spec = discoverPartitions()
-            val partitionColumnTypes = spec.partitionColumns.map(_.dataType)
-            val castedPartitions = spec.partitions.map { case p @ Partition(values, path) =>
-              val literals = partitionColumnTypes.zipWithIndex.map { case (dt, i) =>
-                Literal.create(values.get(i, dt), dt)
-              }
-              val castedValues = partitionSchema.zip(literals).map { case (field, literal) =>
-                Cast(literal, field.dataType).eval()
-              }
-              p.copy(values = InternalRow.fromSeq(castedValues))
-            }
-            PartitionSpec(partitionSchema, castedPartitions)
-          }
-        }
-        .getOrElse {
-          if (sqlContext.conf.partitionDiscoveryEnabled()) {
-            discoverPartitions()
-          } else {
-            PartitionSpec(StructType(Nil), Array.empty[Partition])
-          }
-        }
-    }
-    _partitionSpec
-  }
+  def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
 
-  /**
-   * Paths of this relation.  For partitioned relations, it should be root directories
-   * of all partition directories.
-   *
-   * @since 1.4.0
-   */
-  def paths: Array[String]
-
-  /**
-   * Contains a set of paths that are considered as the base dirs of the input datasets.
-   * The partitioning discovery logic will make sure it will stop when it reaches any
-   * base path. By default, the paths of the dataset provided by users will be base paths.
-   * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
-   * will be `/path/something=true/`, and the returned DataFrame will not contain a column of
-   * `something`. If users want to override the basePath. They can set `basePath` in the options
-   * to pass the new base path to the data source.
-   * For the above example, if the user-provided base path is `/path/`, the returned
-   * DataFrame will have the column of `something`.
-   */
-  private def basePaths: Set[Path] = {
-    val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
-    userDefinedBasePath.getOrElse {
-      // If the user does not provide basePath, we will just use paths.
-      val pathSet = paths.toSet
-      pathSet.map(p => new Path(p))
-    }.map { hdfsPath =>
-      // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
-      val fs = hdfsPath.getFileSystem(hadoopConf)
-      hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-    }
-  }
-
-  override def inputFiles: Array[String] = cachedLeafStatuses().map(_.getPath.toString).toArray
+  def getStatus(path: Path): Array[FileStatus] = leafDirToChildrenFiles(path)
 
-  override def sizeInBytes: Long = cachedLeafStatuses().map(_.getLen).sum
-
-  /**
-   * Partition columns.  Can be either defined by [[userDefinedPartitionColumns]] or automatically
-   * discovered.  Note that they should always be nullable.
-   *
-   * @since 1.4.0
-   */
-  final def partitionColumns: StructType =
-    userDefinedPartitionColumns.getOrElse(partitionSpec.partitionColumns)
+  private def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+    if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) {
+      HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext)
+    } else {
+      val statuses = paths.flatMap { path =>
+        val fs = path.getFileSystem(hadoopConf)
+        logInfo(s"Listing $path on driver")
+        // Dummy jobconf to get to the pathFilter defined in configuration
+        val jobConf = new JobConf(hadoopConf, this.getClass())
+        val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+        if (pathFilter != null) {
+          Try(fs.listStatus(path, pathFilter)).getOrElse(Array.empty)
+        } else {
+          Try(fs.listStatus(path)).getOrElse(Array.empty)
+        }
+      }.filterNot { status =>
+        val name = status.getPath.getName
+        name.toLowerCase == "_temporary" || name.startsWith(".")
+      }
 
-  /**
-   * Optional user defined partition columns.
-   *
-   * @since 1.4.0
-   */
-  def userDefinedPartitionColumns: Option[StructType] = None
+      val (dirs, files) = statuses.partition(_.isDirectory)
 
-  private[sql] def refresh(): Unit = {
-    fileStatusCache.refresh()
-    if (sqlContext.conf.partitionDiscoveryEnabled()) {
-      _partitionSpec = discoverPartitions()
+      // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
+      if (dirs.isEmpty) {
+        mutable.LinkedHashSet(files: _*)
+      } else {
+        mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
+      }
     }
   }
 
-  private def discoverPartitions(): PartitionSpec = {
+   def inferPartitioning(schema: Option[StructType]): PartitionSpec = {
     // We use leaf dirs containing data files to discover the schema.
-    val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq
-    userDefinedPartitionColumns match {
+    val leafDirs = leafDirToChildrenFiles.keys.toSeq
+    schema match {
       case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
         val spec = PartitioningUtils.parsePartitions(
           leafDirs,
@@ -693,9 +558,7 @@ abstract class HadoopFsRelation private[sql](
         PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
           part.copy(values = castPartitionValuesToUserSchema(part.values))
         })
-
-      case _ =>
-        // user did not provide a partitioning schema
+      case None =>
         PartitioningUtils.parsePartitions(
           leafDirs,
           PartitioningUtils.DEFAULT_PARTITION_NAME,
@@ -705,271 +568,51 @@ abstract class HadoopFsRelation private[sql](
   }
 
   /**
-   * Schema of this relation.  It consists of columns appearing in [[dataSchema]] and all partition
-   * columns not appearing in [[dataSchema]].
-   *
-   * @since 1.4.0
-   */
-  override lazy val schema: StructType = {
-    val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
-    StructType(dataSchema ++ partitionColumns.filterNot { column =>
-      dataSchemaColumnNames.contains(column.name.toLowerCase)
-    })
-  }
-
-  /**
-   * Groups the input files by bucket id, if bucketing is enabled and this data source is bucketed.
-   * Returns None if there exists any malformed bucket files.
+   * Contains a set of paths that are considered as the base dirs of the input datasets.
+   * The partitioning discovery logic will make sure it will stop when it reaches any
+   * base path. By default, the paths of the dataset provided by users will be base paths.
+   * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
+   * will be `/path/something=true/`, and the returned DataFrame will not contain a column of
+   * `something`. If users want to override the basePath. They can set `basePath` in the options
+   * to pass the new base path to the data source.
+   * For the above example, if the user-provided base path is `/path/`, the returned
+   * DataFrame will have the column of `something`.
    */
-  private def groupBucketFiles(
-      files: Array[FileStatus]): Option[scala.collection.Map[Int, Array[FileStatus]]] = {
-    malformedBucketFile = false
-    if (getBucketSpec.isDefined) {
-      val groupedBucketFiles = mutable.HashMap.empty[Int, mutable.ArrayBuffer[FileStatus]]
-      var i = 0
-      while (!malformedBucketFile && i < files.length) {
-        val bucketId = BucketingUtils.getBucketId(files(i).getPath.getName)
-        if (bucketId.isEmpty) {
-          logError(s"File ${files(i).getPath} is expected to be a bucket file, but there is no " +
-            "bucket id information in file name. Fall back to non-bucketing mode.")
-          malformedBucketFile = true
-        } else {
-          val bucketFiles =
-            groupedBucketFiles.getOrElseUpdate(bucketId.get, mutable.ArrayBuffer.empty)
-          bucketFiles += files(i)
-        }
-        i += 1
-      }
-      if (malformedBucketFile) None else Some(groupedBucketFiles.mapValues(_.toArray))
-    } else {
-      None
-    }
-  }
-
-  final private[sql] def buildInternalScan(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      bucketSet: Option[BitSet],
-      inputPaths: Array[String],
-      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
-    val inputStatuses = inputPaths.flatMap { input =>
-      val path = new Path(input)
-
-      // First assumes `input` is a directory path, and tries to get all files contained in it.
-      fileStatusCache.leafDirToChildrenFiles.getOrElse(
-        path,
-        // Otherwise, `input` might be a file path
-        fileStatusCache.leafFiles.get(path).toArray
-      ).filter { status =>
-        val name = status.getPath.getName
-        !name.startsWith("_") && !name.startsWith(".")
-      }
-    }
-
-    groupBucketFiles(inputStatuses).map { groupedBucketFiles =>
-      // For each bucket id, firstly we get all files belong to this bucket, by detecting bucket
-      // id from file name. Then read these files into a RDD(use one-partition empty RDD for empty
-      // bucket), and coalesce it to one partition. Finally union all bucket RDDs to one result.
-      val perBucketRows = (0 until maybeBucketSpec.get.numBuckets).map { bucketId =>
-        // If the current bucketId is not set in the bucket bitSet, skip scanning it.
-        if (bucketSet.nonEmpty && !bucketSet.get.get(bucketId)){
-          sqlContext.emptyResult
-        } else {
-          // When all the buckets need a scan (i.e., bucketSet is equal to None)
-          // or when the current bucket need a scan (i.e., the bit of bucketId is set to true)
-          groupedBucketFiles.get(bucketId).map { inputStatuses =>
-            buildInternalScan(requiredColumns, filters, inputStatuses, broadcastedConf).coalesce(1)
-          }.getOrElse(sqlContext.emptyResult)
-        }
-      }
-
-      new UnionRDD(sqlContext.sparkContext, perBucketRows)
-    }.getOrElse {
-      buildInternalScan(requiredColumns, filters, inputStatuses, broadcastedConf)
+  private def basePaths: Set[Path] = {
+    val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
+    userDefinedBasePath.getOrElse {
+      // If the user does not provide basePath, we will just use paths.
+      paths.toSet
+    }.map { hdfsPath =>
+      // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+      val fs = hdfsPath.getFileSystem(hadoopConf)
+      hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
     }
   }
 
-  /**
-   * Specifies schema of actual data files.  For partitioned relations, if one or more partitioned
-   * columns are contained in the data files, they should also appear in `dataSchema`.
-   *
-   * @since 1.4.0
-   */
-  def dataSchema: StructType
-
-  /**
-   * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within
-   * this relation. For partitioned relations, this method is called for each selected partition,
-   * and builds an `RDD[Row]` containing all rows within that single partition.
-   *
-   * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
-   *        relation. For a partitioned relation, it contains paths of all data files in a single
-   *        selected partition.
-   *
-   * @since 1.4.0
-   */
-  def buildScan(inputFiles: Array[FileStatus]): RDD[Row] = {
-    throw new UnsupportedOperationException(
-      "At least one buildScan() method should be overridden to read the relation.")
-  }
-
-  /**
-   * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within
-   * this relation. For partitioned relations, this method is called for each selected partition,
-   * and builds an `RDD[Row]` containing all rows within that single partition.
-   *
-   * @param requiredColumns Required columns.
-   * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
-   *        relation. For a partitioned relation, it contains paths of all data files in a single
-   *        selected partition.
-   *
-   * @since 1.4.0
-   */
-  // TODO Tries to eliminate the extra Catalyst-to-Scala conversion when `needConversion` is true
-  //
-  // PR #7626 separated `Row` and `InternalRow` completely.  One of the consequences is that we can
-  // no longer treat an `InternalRow` containing Catalyst values as a `Row`.  Thus we have to
-  // introduce another row value conversion for data sources whose `needConversion` is true.
-  def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = {
-    // Yeah, to workaround serialization...
-    val dataSchema = this.dataSchema
-    val needConversion = this.needConversion
-
-    val requiredOutput = requiredColumns.map { col =>
-      val field = dataSchema(col)
-      BoundReference(dataSchema.fieldIndex(col), field.dataType, field.nullable)
-    }.toSeq
-
-    val rdd: RDD[Row] = buildScan(inputFiles)
-    val converted: RDD[InternalRow] =
-      if (needConversion) {
-        RDDConversions.rowToRowRdd(rdd, dataSchema.fields.map(_.dataType))
-      } else {
-        rdd.asInstanceOf[RDD[InternalRow]]
-      }
+  def refresh(): Unit = {
+    val files = listLeafFiles(paths)
 
-    converted.mapPartitions { rows =>
-      val buildProjection =
-        GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
+    leafFiles.clear()
+    leafDirToChildrenFiles.clear()
 
-      val projectedRows = {
-        val mutableProjection = buildProjection()
-        rows.map(r => mutableProjection(r))
-      }
+    leafFiles ++= files.map(f => f.getPath -> f)
+    leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent)
 
-      if (needConversion) {
-        val requiredSchema = StructType(requiredColumns.map(dataSchema(_)))
-        val toScala = CatalystTypeConverters.createToScalaConverter(requiredSchema)
-        projectedRows.map(toScala(_).asInstanceOf[Row])
-      } else {
-        projectedRows
-      }
-    }.asInstanceOf[RDD[Row]]
+    cachedPartitionSpec = null
   }
 
-  /**
-   * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within
-   * this relation. For partitioned relations, this method is called for each selected partition,
-   * and builds an `RDD[Row]` containing all rows within that single partition.
-   *
-   * @param requiredColumns Required columns.
-   * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction
-   *        of all `filters`.  The pushed down filters are currently purely an optimization as they
-   *        will all be evaluated again. This means it is safe to use them with methods that produce
-   *        false positives such as filtering partitions based on a bloom filter.
-   * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
-   *        relation. For a partitioned relation, it contains paths of all data files in a single
-   *        selected partition.
-   *
-   * @since 1.4.0
-   */
-  def buildScan(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      inputFiles: Array[FileStatus]): RDD[Row] = {
-    buildScan(requiredColumns, inputFiles)
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: HDFSFileCatalog => paths.toSet == hdfs.paths.toSet
+    case _ => false
   }
 
-  /**
-   * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within
-   * this relation. For partitioned relations, this method is called for each selected partition,
-   * and builds an `RDD[Row]` containing all rows within that single partition.
-   *
-   * Note: This interface is subject to change in future.
-   *
-   * @param requiredColumns Required columns.
-   * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction
-   *        of all `filters`.  The pushed down filters are currently purely an optimization as they
-   *        will all be evaluated again. This means it is safe to use them with methods that produce
-   *        false positives such as filtering partitions based on a bloom filter.
-   * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
-   *        relation. For a partitioned relation, it contains paths of all data files in a single
-   *        selected partition.
-   * @param broadcastedConf A shared broadcast Hadoop Configuration, which can be used to reduce the
-   *                        overhead of broadcasting the Configuration for every Hadoop RDD.
-   *
-   * @since 1.4.0
-   */
-  private[sql] def buildScan(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      inputFiles: Array[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
-    buildScan(requiredColumns, filters, inputFiles)
-  }
-
-  /**
-   * For a non-partitioned relation, this method builds an `RDD[InternalRow]` containing all rows
-   * within this relation. For partitioned relations, this method is called for each selected
-   * partition, and builds an `RDD[InternalRow]` containing all rows within that single partition.
-   *
-   * Note:
-   *
-   * 1. Rows contained in the returned `RDD[InternalRow]` are assumed to be `UnsafeRow`s.
-   * 2. This interface is subject to change in future.
-   *
-   * @param requiredColumns Required columns.
-   * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction
-   *        of all `filters`.  The pushed down filters are currently purely an optimization as they
-   *        will all be evaluated again. This means it is safe to use them with methods that produce
-   *        false positives such as filtering partitions based on a bloom filter.
-   * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
-   *        relation. For a partitioned relation, it contains paths of all data files in a single
-   *        selected partition.
-   * @param broadcastedConf A shared broadcast Hadoop Configuration, which can be used to reduce the
-   *        overhead of broadcasting the Configuration for every Hadoop RDD.
-   */
-  private[sql] def buildInternalScan(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      inputFiles: Array[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
-    val requiredSchema = StructType(requiredColumns.map(dataSchema.apply))
-    val internalRows = {
-      val externalRows = buildScan(requiredColumns, filters, inputFiles, broadcastedConf)
-      execution.RDDConversions.rowToRowRdd(externalRows, requiredSchema.map(_.dataType))
-    }
-
-    internalRows.mapPartitions { iterator =>
-      val unsafeProjection = UnsafeProjection.create(requiredSchema)
-      iterator.map(unsafeProjection)
-    }
-  }
-
-  /**
-   * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation can
-   * be put here.  For example, user defined output committer can be configured here
-   * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
-   *
-   * Note that the only side effect expected here is mutating `job` via its setters.  Especially,
-   * Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states
-   * may cause unexpected behaviors.
-   *
-   * @since 1.4.0
-   */
-  def prepareJobForWrite(job: Job): OutputWriterFactory
+  override def hashCode(): Int = paths.toSet.hashCode()
 }
 
+/**
+ * Helper methods for gathering metadata from HDFS.
+ */
 private[sql] object HadoopFsRelation extends Logging {
   // We don't filter files/directories whose name start with "_" except "_temporary" here, as
   // specific data sources may take advantages over them (e.g. Parquet _metadata and
@@ -1009,17 +652,17 @@ private[sql] object HadoopFsRelation extends Logging {
       accessTime: Long)
 
   def listLeafFilesInParallel(
-      paths: Array[String],
+      paths: Seq[Path],
       hadoopConf: Configuration,
       sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = {
     logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
 
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
-    val fakeStatuses = sparkContext.parallelize(paths).flatMap { path =>
-      val hdfsPath = new Path(path)
-      val fs = hdfsPath.getFileSystem(serializableConfiguration.value)
-      val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-      Try(listLeafFiles(fs, fs.getFileStatus(qualified))).getOrElse(Array.empty)
+    val serializedPaths = paths.map(_.toString)
+
+    val fakeStatuses = sparkContext.parallelize(serializedPaths).map(new Path(_)).flatMap { path =>
+      val fs = path.getFileSystem(serializableConfiguration.value)
+      Try(listLeafFiles(fs, fs.getFileStatus(path))).getOrElse(Array.empty)
     }.map { status =>
       FakeFileStatus(
         status.getPath.toString,

http://git-wip-us.apache.org/repos/asf/spark/blob/e720dda4/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index a824759..55153cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -889,7 +889,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         .write.format("parquet").save("temp")
     }
     assert(e.getMessage.contains("Duplicate column(s)"))
-    assert(e.getMessage.contains("parquet"))
     assert(e.getMessage.contains("column1"))
     assert(!e.getMessage.contains("column2"))
 
@@ -900,7 +899,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         .write.format("json").save("temp")
     }
     assert(f.getMessage.contains("Duplicate column(s)"))
-    assert(f.getMessage.contains("JSON"))
     assert(f.getMessage.contains("column1"))
     assert(f.getMessage.contains("column3"))
     assert(!f.getMessage.contains("column2"))

http://git-wip-us.apache.org/repos/asf/spark/blob/e720dda4/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index f59faa0..182f287 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1741,7 +1741,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     val e3 = intercept[AnalysisException] {
       sql("select * from json.invalid_file")
     }
-    assert(e3.message.contains("No input paths specified"))
+    assert(e3.message.contains("Unable to infer schema"))
   }
 
   test("SortMergeJoin returns wrong results when using UnsafeRows") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org