You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/05/15 10:21:12 UTC

[2/2] spark git commit: [SPARK-7591] [SQL] Partitioning support API tweaks

[SPARK-7591] [SQL] Partitioning support API tweaks

Please see [SPARK-7591] [1] for the details.

/cc rxin marmbrus yhuai

[1]: https://issues.apache.org/jira/browse/SPARK-7591

Author: Cheng Lian <li...@databricks.com>

Closes #6150 from liancheng/spark-7591 and squashes the following commits:

af422e7 [Cheng Lian] Addresses @rxin's comments
37d1738 [Cheng Lian] Fixes HadoopFsRelation partition columns initialization
2fc680a [Cheng Lian] Fixes Scala style issue
189ad23 [Cheng Lian] Removes HadoopFsRelation constructor arguments
522c24e [Cheng Lian] Adds OutputWriterFactory
047d40d [Cheng Lian] Renames FSBased* to HadoopFs*, also renamed FSBasedParquetRelation back to ParquetRelation2


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdf5bba3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdf5bba3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdf5bba3

Branch: refs/heads/master
Commit: fdf5bba35d201fe0de3901b4d47262c485c76569
Parents: 9476148
Author: Cheng Lian <li...@databricks.com>
Authored: Fri May 15 16:20:49 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Fri May 15 16:20:49 2015 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala |  14 +-
 .../spark/sql/parquet/fsBasedParquet.scala      | 565 ------------------
 .../apache/spark/sql/parquet/newParquet.scala   | 570 +++++++++++++++++++
 .../spark/sql/sources/DataSourceStrategy.scala  |  10 +-
 .../spark/sql/sources/PartitioningUtils.scala   |   4 +
 .../org/apache/spark/sql/sources/commands.scala |  23 +-
 .../org/apache/spark/sql/sources/ddl.scala      |   8 +-
 .../apache/spark/sql/sources/interfaces.scala   | 140 +++--
 .../org/apache/spark/sql/sources/rules.scala    |   2 +-
 .../spark/sql/parquet/ParquetFilterSuite.scala  |   2 +-
 .../spark/sql/parquet/ParquetSchemaSuite.scala  |  12 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  12 +-
 .../spark/sql/hive/execution/commands.scala     |   2 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   6 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   8 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |  20 +-
 .../spark/sql/sources/SimpleTextRelation.scala  |  47 +-
 .../sql/sources/fsBasedRelationSuites.scala     | 564 ------------------
 .../sql/sources/hadoopFsRelationSuites.scala    | 564 ++++++++++++++++++
 19 files changed, 1287 insertions(+), 1286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index b33a700..9fb355e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.ParserDialect
 import org.apache.spark.sql.execution.{Filter, _}
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
 import org.apache.spark.sql.json._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -610,7 +610,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     } else if (conf.parquetUseDataSourceApi) {
       val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
       baseRelationToDataFrame(
-        new FSBasedParquetRelation(
+        new ParquetRelation2(
           globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
     } else {
       DataFrame(this, parquet.ParquetRelation(
@@ -989,7 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   def jdbc(url: String, table: String): DataFrame = {
     jdbc(url, table, JDBCRelation.columnPartition(null), new Properties())
   }
-  
+
   /**
    * :: Experimental ::
    * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1002,7 +1002,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   def jdbc(url: String, table: String, properties: Properties): DataFrame = {
     jdbc(url, table, JDBCRelation.columnPartition(null), properties)
   }
-  
+
   /**
    * :: Experimental ::
    * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1020,7 +1020,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   @Experimental
   def jdbc(
       url: String,
-      table: String,  
+      table: String,
       columnName: String,
       lowerBound: Long,
       upperBound: Long,
@@ -1056,7 +1056,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     val parts = JDBCRelation.columnPartition(partitioning)
     jdbc(url, table, parts, properties)
   }
-  
+
   /**
    * :: Experimental ::
    * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1093,7 +1093,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     }
     jdbc(url, table, parts, properties)
   }
-  
+
   private def jdbc(
       url: String,
       table: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
deleted file mode 100644
index c83a9c3..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.util.{List => JList}
-
-import scala.collection.JavaConversions._
-import scala.util.Try
-
-import com.google.common.base.Objects
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import parquet.filter2.predicate.FilterApi
-import parquet.format.converter.ParquetMetadataConverter
-import parquet.hadoop._
-import parquet.hadoop.metadata.CompressionCodecName
-import parquet.hadoop.util.ContextUtil
-
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD._
-import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.sql.{Row, SQLConf, SQLContext}
-import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
-
-private[sql] class DefaultSource extends FSBasedRelationProvider {
-  override def createRelation(
-      sqlContext: SQLContext,
-      paths: Array[String],
-      schema: Option[StructType],
-      partitionColumns: Option[StructType],
-      parameters: Map[String, String]): FSBasedRelation = {
-    val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
-    new FSBasedParquetRelation(paths, schema, partitionSpec, parameters)(sqlContext)
-  }
-}
-
-// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
-private[sql] class ParquetOutputWriter extends OutputWriter {
-  private var recordWriter: RecordWriter[Void, Row] = _
-  private var taskAttemptContext: TaskAttemptContext = _
-
-  override def init(
-      path: String,
-      dataSchema: StructType,
-      context: TaskAttemptContext): Unit = {
-    val conf = context.getConfiguration
-    val outputFormat = {
-      // When appending new Parquet files to an existing Parquet file directory, to avoid
-      // overwriting existing data files, we need to find out the max task ID encoded in these data
-      // file names.
-      // TODO Make this snippet a utility function for other data source developers
-      val maxExistingTaskId = {
-        // Note that `path` may point to a temporary location.  Here we retrieve the real
-        // destination path from the configuration
-        val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
-        val fs = outputPath.getFileSystem(conf)
-
-        if (fs.exists(outputPath)) {
-          // Pattern used to match task ID in part file names, e.g.:
-          //
-          //   part-r-00001.gz.part
-          //          ^~~~~
-          val partFilePattern = """part-.-(\d{1,}).*""".r
-
-          fs.listStatus(outputPath).map(_.getPath.getName).map {
-            case partFilePattern(id) => id.toInt
-            case name if name.startsWith("_") => 0
-            case name if name.startsWith(".") => 0
-            case name => sys.error(
-              s"""Trying to write Parquet files to directory $outputPath,
-                 |but found items with illegal name "$name"
-               """.stripMargin.replace('\n', ' ').trim)
-          }.reduceOption(_ max _).getOrElse(0)
-        } else {
-          0
-        }
-      }
-
-      new ParquetOutputFormat[Row]() {
-        // Here we override `getDefaultWorkFile` for two reasons:
-        //
-        //  1. To allow appending.  We need to generate output file name based on the max available
-        //     task ID computed above.
-        //
-        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` uses
-        //     `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
-        //     partitions in the case of dynamic partitioning.
-        override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-          val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
-          new Path(path, f"part-r-$split%05d$extension")
-        }
-      }
-    }
-
-    recordWriter = outputFormat.getRecordWriter(context)
-    taskAttemptContext = context
-  }
-
-  override def write(row: Row): Unit = recordWriter.write(null, row)
-
-  override def close(): Unit = recordWriter.close(taskAttemptContext)
-}
-
-private[sql] class FSBasedParquetRelation(
-    paths: Array[String],
-    private val maybeDataSchema: Option[StructType],
-    private val maybePartitionSpec: Option[PartitionSpec],
-    parameters: Map[String, String])(
-    val sqlContext: SQLContext)
-  extends FSBasedRelation(paths, maybePartitionSpec)
-  with Logging {
-
-  // Should we merge schemas from all Parquet part-files?
-  private val shouldMergeSchemas =
-    parameters.getOrElse(FSBasedParquetRelation.MERGE_SCHEMA, "true").toBoolean
-
-  private val maybeMetastoreSchema = parameters
-    .get(FSBasedParquetRelation.METASTORE_SCHEMA)
-    .map(DataType.fromJson(_).asInstanceOf[StructType])
-
-  private val metadataCache = new MetadataCache
-  metadataCache.refresh()
-
-  override def equals(other: scala.Any): Boolean = other match {
-    case that: FSBasedParquetRelation =>
-      val schemaEquality = if (shouldMergeSchemas) {
-        this.shouldMergeSchemas == that.shouldMergeSchemas
-      } else {
-        this.dataSchema == that.dataSchema &&
-          this.schema == that.schema
-      }
-
-      this.paths.toSet == that.paths.toSet &&
-        schemaEquality &&
-        this.maybeDataSchema == that.maybeDataSchema &&
-        this.partitionColumns == that.partitionColumns
-
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    if (shouldMergeSchemas) {
-      Objects.hashCode(
-        Boolean.box(shouldMergeSchemas),
-        paths.toSet,
-        maybeDataSchema,
-        maybePartitionSpec)
-    } else {
-      Objects.hashCode(
-        Boolean.box(shouldMergeSchemas),
-        paths.toSet,
-        dataSchema,
-        schema,
-        maybeDataSchema,
-        maybePartitionSpec)
-    }
-  }
-
-  override def outputWriterClass: Class[_ <: OutputWriter] = classOf[ParquetOutputWriter]
-
-  override def dataSchema: StructType = metadataCache.dataSchema
-
-  override private[sql] def refresh(): Unit = {
-    metadataCache.refresh()
-    super.refresh()
-  }
-
-  // Parquet data source always uses Catalyst internal representations.
-  override val needConversion: Boolean = false
-
-  override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
-
-  override def prepareForWrite(job: Job): Unit = {
-    val conf = ContextUtil.getConfiguration(job)
-
-    val committerClass =
-      conf.getClass(
-        "spark.sql.parquet.output.committer.class",
-        classOf[ParquetOutputCommitter],
-        classOf[ParquetOutputCommitter])
-
-    conf.setClass(
-      "mapred.output.committer.class",
-      committerClass,
-      classOf[ParquetOutputCommitter])
-
-    // TODO There's no need to use two kinds of WriteSupport
-    // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
-    // complex types.
-    val writeSupportClass =
-      if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
-        classOf[MutableRowWriteSupport]
-      } else {
-        classOf[RowWriteSupport]
-      }
-
-    ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
-    RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
-
-    // Sets compression scheme
-    conf.set(
-      ParquetOutputFormat.COMPRESSION,
-      ParquetRelation
-        .shortParquetCompressionCodecNames
-        .getOrElse(
-          sqlContext.conf.parquetCompressionCodec.toUpperCase,
-          CompressionCodecName.UNCOMPRESSED).name())
-  }
-
-  override def buildScan(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      inputPaths: Array[String]): RDD[Row] = {
-
-    val job = new Job(SparkHadoopUtil.get.conf)
-    val conf = ContextUtil.getConfiguration(job)
-
-    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
-
-    if (inputPaths.nonEmpty) {
-      FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
-    }
-
-    // Try to push down filters when filter push-down is enabled.
-    if (sqlContext.conf.parquetFilterPushDown) {
-      filters
-        // Collects all converted Parquet filter predicates. Notice that not all predicates can be
-        // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-        // is used here.
-        .flatMap(ParquetFilters.createFilter(dataSchema, _))
-        .reduceOption(FilterApi.and)
-        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
-    }
-
-    conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
-      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
-      ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
-    })
-
-    conf.set(
-      RowWriteSupport.SPARK_ROW_SCHEMA,
-      ParquetTypesConverter.convertToString(dataSchema.toAttributes))
-
-    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
-    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
-    conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
-
-    val inputFileStatuses =
-      metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
-
-    val footers = inputFileStatuses.map(metadataCache.footers)
-
-    // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
-    // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
-    // footers.  Especially when a global arbitrative schema (either from metastore or data source
-    // DDL) is available.
-    new NewHadoopRDD(
-      sqlContext.sparkContext,
-      classOf[FilteringParquetRowInputFormat],
-      classOf[Void],
-      classOf[Row],
-      conf) {
-
-      val cacheMetadata = useMetadataCache
-
-      @transient val cachedStatuses = inputFileStatuses.map { f =>
-        // In order to encode the authority of a Path containing special characters such as /,
-        // we need to use the string returned by the URI of the path to create a new Path.
-        val pathWithAuthority = new Path(f.getPath.toUri.toString)
-
-        new FileStatus(
-          f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
-          f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
-      }.toSeq
-
-      @transient val cachedFooters = footers.map { f =>
-        // In order to encode the authority of a Path containing special characters such as /,
-        // we need to use the string returned by the URI of the path to create a new Path.
-        new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
-      }.toSeq
-
-      // Overridden so we can inject our own cached files statuses.
-      override def getPartitions: Array[SparkPartition] = {
-        val inputFormat = if (cacheMetadata) {
-          new FilteringParquetRowInputFormat {
-            override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
-
-            override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
-          }
-        } else {
-          new FilteringParquetRowInputFormat
-        }
-
-        val jobContext = newJobContext(getConf, jobId)
-        val rawSplits = inputFormat.getSplits(jobContext)
-
-        Array.tabulate[SparkPartition](rawSplits.size) { i =>
-          new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
-        }
-      }
-    }.values
-  }
-
-  private class MetadataCache {
-    // `FileStatus` objects of all "_metadata" files.
-    private var metadataStatuses: Array[FileStatus] = _
-
-    // `FileStatus` objects of all "_common_metadata" files.
-    private var commonMetadataStatuses: Array[FileStatus] = _
-
-    // Parquet footer cache.
-    var footers: Map[FileStatus, Footer] = _
-
-    // `FileStatus` objects of all data files (Parquet part-files).
-    var dataStatuses: Array[FileStatus] = _
-
-    // Schema of the actual Parquet files, without partition columns discovered from partition
-    // directory paths.
-    var dataSchema: StructType = _
-
-    // Schema of the whole table, including partition columns.
-    var schema: StructType = _
-
-    /**
-     * Refreshes `FileStatus`es, footers, partition spec, and table schema.
-     */
-    def refresh(): Unit = {
-      // Support either reading a collection of raw Parquet part-files, or a collection of folders
-      // containing Parquet files (e.g. partitioned Parquet table).
-      val baseStatuses = paths.distinct.flatMap { p =>
-        val path = new Path(p)
-        val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-        val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        Try(fs.getFileStatus(qualified)).toOption
-      }
-      assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
-
-      // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
-      val leaves = baseStatuses.flatMap { f =>
-        val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
-        SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
-          isSummaryFile(f.getPath) ||
-            !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
-        }
-      }
-
-      dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
-      metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
-      commonMetadataStatuses =
-        leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
-
-      footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
-        val parquetMetadata = ParquetFileReader.readFooter(
-          SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
-        f -> new Footer(f.getPath, parquetMetadata)
-      }.seq.toMap
-
-      dataSchema = {
-        val dataSchema0 =
-          maybeDataSchema
-            .orElse(readSchema())
-            .orElse(maybeMetastoreSchema)
-            .getOrElse(sys.error("Failed to get the schema."))
-
-        // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
-        // case insensitivity issue and possible schema mismatch (probably caused by schema
-        // evolution).
-        maybeMetastoreSchema
-          .map(FSBasedParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
-          .getOrElse(dataSchema0)
-      }
-    }
-
-    private def isSummaryFile(file: Path): Boolean = {
-      file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
-        file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
-    }
-
-    private def readSchema(): Option[StructType] = {
-      // Sees which file(s) we need to touch in order to figure out the schema.
-      //
-      // Always tries the summary files first if users don't require a merged schema.  In this case,
-      // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
-      // groups information, and could be much smaller for large Parquet files with lots of row
-      // groups.
-      //
-      // NOTE: Metadata stored in the summary files are merged from all part-files.  However, for
-      // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
-      // how to merge them correctly if some key is associated with different values in different
-      // part-files.  When this happens, Parquet simply gives up generating the summary file.  This
-      // implies that if a summary file presents, then:
-      //
-      //   1. Either all part-files have exactly the same Spark SQL schema, or
-      //   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
-      //      their schemas may differ from each other).
-      //
-      // Here we tend to be pessimistic and take the second case into account.  Basically this means
-      // we can't trust the summary files if users require a merged schema, and must touch all part-
-      // files to do the merge.
-      val filesToTouch =
-        if (shouldMergeSchemas) {
-          // Also includes summary files, 'cause there might be empty partition directories.
-          (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
-        } else {
-          // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
-          // don't have this.
-          commonMetadataStatuses.headOption
-            // Falls back to "_metadata"
-            .orElse(metadataStatuses.headOption)
-            // Summary file(s) not found, the Parquet file is either corrupted, or different part-
-            // files contain conflicting user defined metadata (two or more values are associated
-            // with a same key in different files).  In either case, we fall back to any of the
-            // first part-file, and just assume all schemas are consistent.
-            .orElse(dataStatuses.headOption)
-            .toSeq
-        }
-
-      assert(
-        filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
-        "No schema defined, " +
-          s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
-
-      FSBasedParquetRelation.readSchema(filesToTouch.map(footers.apply), sqlContext)
-    }
-  }
-}
-
-private[sql] object FSBasedParquetRelation extends Logging {
-  // Whether we should merge schemas collected from all Parquet part-files.
-  private[sql] val MERGE_SCHEMA = "mergeSchema"
-
-  // Hive Metastore schema, used when converting Metastore Parquet tables.  This option is only used
-  // internally.
-  private[sql] val METASTORE_SCHEMA = "metastoreSchema"
-
-  private[parquet] def readSchema(
-      footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
-    footers.map { footer =>
-      val metadata = footer.getParquetMetadata.getFileMetaData
-      val parquetSchema = metadata.getSchema
-      val maybeSparkSchema = metadata
-        .getKeyValueMetaData
-        .toMap
-        .get(RowReadSupport.SPARK_METADATA_KEY)
-        .flatMap { serializedSchema =>
-          // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
-          // whatever is available.
-          Try(DataType.fromJson(serializedSchema))
-            .recover { case _: Throwable =>
-              logInfo(
-                s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
-                  "falling back to the deprecated DataType.fromCaseClassString parser.")
-              DataType.fromCaseClassString(serializedSchema)
-            }
-            .recover { case cause: Throwable =>
-              logWarning(
-                s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
-                   |\t$serializedSchema
-                 """.stripMargin,
-                cause)
-            }
-            .map(_.asInstanceOf[StructType])
-            .toOption
-        }
-
-      maybeSparkSchema.getOrElse {
-        // Falls back to Parquet schema if Spark SQL schema is absent.
-        StructType.fromAttributes(
-          // TODO Really no need to use `Attribute` here, we only need to know the data type.
-          ParquetTypesConverter.convertToAttributes(
-            parquetSchema,
-            sqlContext.conf.isParquetBinaryAsString,
-            sqlContext.conf.isParquetINT96AsTimestamp))
-      }
-    }.reduceOption { (left, right) =>
-      try left.merge(right) catch { case e: Throwable =>
-        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
-      }
-    }
-  }
-
-  /**
-   * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
-   * schema and Parquet schema.
-   *
-   * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
-   * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
-   * distinguish binary and string).  This method generates a correct schema by merging Metastore
-   * schema data types and Parquet schema field names.
-   */
-  private[parquet] def mergeMetastoreParquetSchema(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    def schemaConflictMessage: String =
-      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
-         |${metastoreSchema.prettyJson}
-         |
-         |Parquet schema:
-         |${parquetSchema.prettyJson}
-       """.stripMargin
-
-    val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
-
-    assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
-
-    val ordinalMap = metastoreSchema.zipWithIndex.map {
-      case (field, index) => field.name.toLowerCase -> index
-    }.toMap
-
-    val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
-      ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
-
-    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
-      // Uses Parquet field names but retains Metastore data types.
-      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
-        mSchema.copy(name = pSchema.name)
-      case _ =>
-        throw new SparkException(schemaConflictMessage)
-    })
-  }
-
-  /**
-   * Returns the original schema from the Parquet file with any missing nullable fields from the
-   * Hive Metastore schema merged in.
-   *
-   * When constructing a DataFrame from a collection of structured data, the resulting object has
-   * a schema corresponding to the union of the fields present in each element of the collection.
-   * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
-   * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
-   * contain a particular nullable field in its schema despite that field being present in the
-   * table schema obtained from the Hive Metastore. This method returns a schema representing the
-   * Parquet file schema along with any additional nullable fields from the Metastore schema
-   * merged in.
-   */
-  private[parquet] def mergeMissingNullableFields(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
-    val missingFields = metastoreSchema
-      .map(_.name.toLowerCase)
-      .diff(parquetSchema.map(_.name.toLowerCase))
-      .map(fieldMap(_))
-      .filter(_.nullable)
-    StructType(parquetSchema ++ missingFields)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
new file mode 100644
index 0000000..946062f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+import scala.util.Try
+
+import com.google.common.base.Objects
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import parquet.filter2.predicate.FilterApi
+import parquet.format.converter.ParquetMetadataConverter
+import parquet.hadoop._
+import parquet.hadoop.metadata.CompressionCodecName
+import parquet.hadoop.util.ContextUtil
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD._
+import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.{Row, SQLConf, SQLContext}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
+
+private[sql] class DefaultSource extends HadoopFsRelationProvider {
+  override def createRelation(
+      sqlContext: SQLContext,
+      paths: Array[String],
+      schema: Option[StructType],
+      partitionColumns: Option[StructType],
+      parameters: Map[String, String]): HadoopFsRelation = {
+    val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
+    new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext)
+  }
+}
+
+// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
+private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
+  extends OutputWriter {
+
+  private val recordWriter: RecordWriter[Void, Row] = {
+    val conf = context.getConfiguration
+    val outputFormat = {
+      // When appending new Parquet files to an existing Parquet file directory, to avoid
+      // overwriting existing data files, we need to find out the max task ID encoded in these data
+      // file names.
+      // TODO Make this snippet a utility function for other data source developers
+      val maxExistingTaskId = {
+        // Note that `path` may point to a temporary location.  Here we retrieve the real
+        // destination path from the configuration
+        val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
+        val fs = outputPath.getFileSystem(conf)
+
+        if (fs.exists(outputPath)) {
+          // Pattern used to match task ID in part file names, e.g.:
+          //
+          //   part-r-00001.gz.parquet
+          //          ^~~~~
+          val partFilePattern = """part-.-(\d{1,}).*""".r
+
+          fs.listStatus(outputPath).map(_.getPath.getName).map {
+            case partFilePattern(id) => id.toInt
+            case name if name.startsWith("_") => 0
+            case name if name.startsWith(".") => 0
+            case name => sys.error(
+              s"Trying to write Parquet files to directory $outputPath, " +
+                s"but found items with illegal name '$name'.")
+          }.reduceOption(_ max _).getOrElse(0)
+        } else {
+          0
+        }
+      }
+
+      new ParquetOutputFormat[Row]() {
+        // Here we override `getDefaultWorkFile` for two reasons:
+        //
+        //  1. To allow appending.  We need to generate output file name based on the max available
+        //     task ID computed above.
+        //
+        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` uses
+        //     `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
+        //     partitions in the case of dynamic partitioning.
+        override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+          val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
+          new Path(path, f"part-r-$split%05d$extension")
+        }
+      }
+    }
+
+    outputFormat.getRecordWriter(context)
+  }
+
+  override def write(row: Row): Unit = recordWriter.write(null, row)
+
+  override def close(): Unit = recordWriter.close(context)
+}
+
+private[sql] class ParquetRelation2(
+    override val paths: Array[String],
+    private val maybeDataSchema: Option[StructType],
+    private val maybePartitionSpec: Option[PartitionSpec],
+    parameters: Map[String, String])(
+    val sqlContext: SQLContext)
+  extends HadoopFsRelation(maybePartitionSpec)
+  with Logging {
+
+  // Should we merge schemas from all Parquet part-files?
+  private val shouldMergeSchemas =
+    parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
+
+  private val maybeMetastoreSchema = parameters
+    .get(ParquetRelation2.METASTORE_SCHEMA)
+    .map(DataType.fromJson(_).asInstanceOf[StructType])
+
+  private lazy val metadataCache: MetadataCache = {
+    val meta = new MetadataCache
+    meta.refresh()
+    meta
+  }
+
+  override def equals(other: scala.Any): Boolean = other match {
+    case that: ParquetRelation2 =>
+      val schemaEquality = if (shouldMergeSchemas) {
+        this.shouldMergeSchemas == that.shouldMergeSchemas
+      } else {
+        this.dataSchema == that.dataSchema &&
+          this.schema == that.schema
+      }
+
+      this.paths.toSet == that.paths.toSet &&
+        schemaEquality &&
+        this.maybeDataSchema == that.maybeDataSchema &&
+        this.partitionColumns == that.partitionColumns
+
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    if (shouldMergeSchemas) {
+      Objects.hashCode(
+        Boolean.box(shouldMergeSchemas),
+        paths.toSet,
+        maybeDataSchema,
+        maybePartitionSpec)
+    } else {
+      Objects.hashCode(
+        Boolean.box(shouldMergeSchemas),
+        paths.toSet,
+        dataSchema,
+        schema,
+        maybeDataSchema,
+        maybePartitionSpec)
+    }
+  }
+
+  override def dataSchema: StructType = metadataCache.dataSchema
+
+  override private[sql] def refresh(): Unit = {
+    metadataCache.refresh()
+    super.refresh()
+  }
+
+  // Parquet data source always uses Catalyst internal representations.
+  override val needConversion: Boolean = false
+
+  override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
+
+  override def userDefinedPartitionColumns: Option[StructType] =
+    maybePartitionSpec.map(_.partitionColumns)
+
+  override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+    val conf = ContextUtil.getConfiguration(job)
+
+    val committerClass =
+      conf.getClass(
+        "spark.sql.parquet.output.committer.class",
+        classOf[ParquetOutputCommitter],
+        classOf[ParquetOutputCommitter])
+
+    conf.setClass(
+      "mapred.output.committer.class",
+      committerClass,
+      classOf[ParquetOutputCommitter])
+
+    // TODO There's no need to use two kinds of WriteSupport
+    // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
+    // complex types.
+    val writeSupportClass =
+      if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
+        classOf[MutableRowWriteSupport]
+      } else {
+        classOf[RowWriteSupport]
+      }
+
+    ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
+    RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
+
+    // Sets compression scheme
+    conf.set(
+      ParquetOutputFormat.COMPRESSION,
+      ParquetRelation
+        .shortParquetCompressionCodecNames
+        .getOrElse(
+          sqlContext.conf.parquetCompressionCodec.toUpperCase,
+          CompressionCodecName.UNCOMPRESSED).name())
+
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = {
+        new ParquetOutputWriter(path, context)
+      }
+    }
+  }
+
+  override def buildScan(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      inputPaths: Array[String]): RDD[Row] = {
+
+    val job = new Job(SparkHadoopUtil.get.conf)
+    val conf = ContextUtil.getConfiguration(job)
+
+    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
+
+    if (inputPaths.nonEmpty) {
+      FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
+    }
+
+    // Try to push down filters when filter push-down is enabled.
+    if (sqlContext.conf.parquetFilterPushDown) {
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+        // is used here.
+        .flatMap(ParquetFilters.createFilter(dataSchema, _))
+        .reduceOption(FilterApi.and)
+        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
+    }
+
+    conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
+      ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
+    })
+
+    conf.set(
+      RowWriteSupport.SPARK_ROW_SCHEMA,
+      ParquetTypesConverter.convertToString(dataSchema.toAttributes))
+
+    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
+    conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
+
+    val inputFileStatuses =
+      metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
+
+    val footers = inputFileStatuses.map(metadataCache.footers)
+
+    // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
+    // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
+    // footers.  Especially when a global arbitrative schema (either from metastore or data source
+    // DDL) is available.
+    new NewHadoopRDD(
+      sqlContext.sparkContext,
+      classOf[FilteringParquetRowInputFormat],
+      classOf[Void],
+      classOf[Row],
+      conf) {
+
+      val cacheMetadata = useMetadataCache
+
+      @transient val cachedStatuses = inputFileStatuses.map { f =>
+        // In order to encode the authority of a Path containing special characters such as /,
+        // we need to use the string returned by the URI of the path to create a new Path.
+        val pathWithAuthority = new Path(f.getPath.toUri.toString)
+
+        new FileStatus(
+          f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
+          f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
+      }.toSeq
+
+      @transient val cachedFooters = footers.map { f =>
+        // In order to encode the authority of a Path containing special characters such as /,
+        // we need to use the string returned by the URI of the path to create a new Path.
+        new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
+      }.toSeq
+
+      // Overridden so we can inject our own cached files statuses.
+      override def getPartitions: Array[SparkPartition] = {
+        val inputFormat = if (cacheMetadata) {
+          new FilteringParquetRowInputFormat {
+            override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
+
+            override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
+          }
+        } else {
+          new FilteringParquetRowInputFormat
+        }
+
+        val jobContext = newJobContext(getConf, jobId)
+        val rawSplits = inputFormat.getSplits(jobContext)
+
+        Array.tabulate[SparkPartition](rawSplits.size) { i =>
+          new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+        }
+      }
+    }.values
+  }
+
+  private class MetadataCache {
+    // `FileStatus` objects of all "_metadata" files.
+    private var metadataStatuses: Array[FileStatus] = _
+
+    // `FileStatus` objects of all "_common_metadata" files.
+    private var commonMetadataStatuses: Array[FileStatus] = _
+
+    // Parquet footer cache.
+    var footers: Map[FileStatus, Footer] = _
+
+    // `FileStatus` objects of all data files (Parquet part-files).
+    var dataStatuses: Array[FileStatus] = _
+
+    // Schema of the actual Parquet files, without partition columns discovered from partition
+    // directory paths.
+    var dataSchema: StructType = _
+
+    // Schema of the whole table, including partition columns.
+    var schema: StructType = _
+
+    /**
+     * Refreshes `FileStatus`es, footers, partition spec, and table schema.
+     */
+    def refresh(): Unit = {
+      // Support either reading a collection of raw Parquet part-files, or a collection of folders
+      // containing Parquet files (e.g. partitioned Parquet table).
+      val baseStatuses = paths.distinct.flatMap { p =>
+        val path = new Path(p)
+        val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+        Try(fs.getFileStatus(qualified)).toOption
+      }
+      assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
+
+      // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
+      val leaves = baseStatuses.flatMap { f =>
+        val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
+        SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
+          isSummaryFile(f.getPath) ||
+            !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+        }
+      }
+
+      dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
+      metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
+      commonMetadataStatuses =
+        leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
+
+      footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
+        val parquetMetadata = ParquetFileReader.readFooter(
+          SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
+        f -> new Footer(f.getPath, parquetMetadata)
+      }.seq.toMap
+
+      dataSchema = {
+        val dataSchema0 =
+          maybeDataSchema
+            .orElse(readSchema())
+            .orElse(maybeMetastoreSchema)
+            .getOrElse(sys.error("Failed to get the schema."))
+
+        // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
+        // case insensitivity issue and possible schema mismatch (probably caused by schema
+        // evolution).
+        maybeMetastoreSchema
+          .map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
+          .getOrElse(dataSchema0)
+      }
+    }
+
+    private def isSummaryFile(file: Path): Boolean = {
+      file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
+        file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
+    }
+
+    private def readSchema(): Option[StructType] = {
+      // Sees which file(s) we need to touch in order to figure out the schema.
+      //
+      // Always tries the summary files first if users don't require a merged schema.  In this case,
+      // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
+      // groups information, and could be much smaller for large Parquet files with lots of row
+      // groups.
+      //
+      // NOTE: Metadata stored in the summary files are merged from all part-files.  However, for
+      // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
+      // how to merge them correctly if some key is associated with different values in different
+      // part-files.  When this happens, Parquet simply gives up generating the summary file.  This
+      // implies that if a summary file presents, then:
+      //
+      //   1. Either all part-files have exactly the same Spark SQL schema, or
+      //   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
+      //      their schemas may differ from each other).
+      //
+      // Here we tend to be pessimistic and take the second case into account.  Basically this means
+      // we can't trust the summary files if users require a merged schema, and must touch all part-
+      // files to do the merge.
+      val filesToTouch =
+        if (shouldMergeSchemas) {
+          // Also includes summary files, 'cause there might be empty partition directories.
+          (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
+        } else {
+          // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
+          // don't have this.
+          commonMetadataStatuses.headOption
+            // Falls back to "_metadata"
+            .orElse(metadataStatuses.headOption)
+            // Summary file(s) not found, the Parquet file is either corrupted, or different part-
+            // files contain conflicting user defined metadata (two or more values are associated
+            // with a same key in different files).  In either case, we fall back to any of the
+            // first part-file, and just assume all schemas are consistent.
+            .orElse(dataStatuses.headOption)
+            .toSeq
+        }
+
+      assert(
+        filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
+        "No schema defined, " +
+          s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
+
+      ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
+    }
+  }
+}
+
+private[sql] object ParquetRelation2 extends Logging {
+  // Whether we should merge schemas collected from all Parquet part-files.
+  private[sql] val MERGE_SCHEMA = "mergeSchema"
+
+  // Hive Metastore schema, used when converting Metastore Parquet tables.  This option is only used
+  // internally.
+  private[sql] val METASTORE_SCHEMA = "metastoreSchema"
+
+  private[parquet] def readSchema(
+      footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
+    footers.map { footer =>
+      val metadata = footer.getParquetMetadata.getFileMetaData
+      val parquetSchema = metadata.getSchema
+      val maybeSparkSchema = metadata
+        .getKeyValueMetaData
+        .toMap
+        .get(RowReadSupport.SPARK_METADATA_KEY)
+        .flatMap { serializedSchema =>
+          // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
+          // whatever is available.
+          Try(DataType.fromJson(serializedSchema))
+            .recover { case _: Throwable =>
+              logInfo(
+                s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
+                  "falling back to the deprecated DataType.fromCaseClassString parser.")
+              DataType.fromCaseClassString(serializedSchema)
+            }
+            .recover { case cause: Throwable =>
+              logWarning(
+                s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
+                   |\t$serializedSchema
+                 """.stripMargin,
+                cause)
+            }
+            .map(_.asInstanceOf[StructType])
+            .toOption
+        }
+
+      maybeSparkSchema.getOrElse {
+        // Falls back to Parquet schema if Spark SQL schema is absent.
+        StructType.fromAttributes(
+          // TODO Really no need to use `Attribute` here, we only need to know the data type.
+          ParquetTypesConverter.convertToAttributes(
+            parquetSchema,
+            sqlContext.conf.isParquetBinaryAsString,
+            sqlContext.conf.isParquetINT96AsTimestamp))
+      }
+    }.reduceOption { (left, right) =>
+      try left.merge(right) catch { case e: Throwable =>
+        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
+      }
+    }
+  }
+
+  /**
+   * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
+   * schema and Parquet schema.
+   *
+   * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
+   * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
+   * distinguish binary and string).  This method generates a correct schema by merging Metastore
+   * schema data types and Parquet schema field names.
+   */
+  private[parquet] def mergeMetastoreParquetSchema(
+      metastoreSchema: StructType,
+      parquetSchema: StructType): StructType = {
+    def schemaConflictMessage: String =
+      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
+         |${metastoreSchema.prettyJson}
+         |
+         |Parquet schema:
+         |${parquetSchema.prettyJson}
+       """.stripMargin
+
+    val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
+
+    assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
+
+    val ordinalMap = metastoreSchema.zipWithIndex.map {
+      case (field, index) => field.name.toLowerCase -> index
+    }.toMap
+
+    val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
+      ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
+
+    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
+      // Uses Parquet field names but retains Metastore data types.
+      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
+        mSchema.copy(name = pSchema.name)
+      case _ =>
+        throw new SparkException(schemaConflictMessage)
+    })
+  }
+
+  /**
+   * Returns the original schema from the Parquet file with any missing nullable fields from the
+   * Hive Metastore schema merged in.
+   *
+   * When constructing a DataFrame from a collection of structured data, the resulting object has
+   * a schema corresponding to the union of the fields present in each element of the collection.
+   * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
+   * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
+   * contain a particular nullable field in its schema despite that field being present in the
+   * table schema obtained from the Hive Metastore. This method returns a schema representing the
+   * Parquet file schema along with any additional nullable fields from the Metastore schema
+   * merged in.
+   */
+  private[parquet] def mergeMissingNullableFields(
+      metastoreSchema: StructType,
+      parquetSchema: StructType): StructType = {
+    val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
+    val missingFields = metastoreSchema
+      .map(_.name.toLowerCase)
+      .diff(parquetSchema.map(_.name.toLowerCase))
+      .map(fieldMap(_))
+      .filter(_.nullable)
+    StructType(parquetSchema ++ missingFields)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index ee099ab..e6324b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -59,7 +59,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         (a, _) => t.buildScan(a)) :: Nil
 
     // Scanning partitioned FSBasedRelation
-    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation))
+    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation))
         if t.partitionSpec.partitionColumns.nonEmpty =>
       val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
 
@@ -87,7 +87,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         selectedPartitions) :: Nil
 
     // Scanning non-partitioned FSBasedRelation
-    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) =>
+    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
       val inputPaths = t.paths.map(new Path(_)).flatMap { path =>
         val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration)
         val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
@@ -111,10 +111,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
 
     case i @ logical.InsertIntoTable(
-      l @ LogicalRelation(t: FSBasedRelation), part, query, overwrite, false) if part.isEmpty =>
+      l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) if part.isEmpty =>
       val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
       execution.ExecutedCommand(
-        InsertIntoFSBasedRelation(t, query, Array.empty[String], mode)) :: Nil
+        InsertIntoHadoopFsRelation(t, query, Array.empty[String], mode)) :: Nil
 
     case _ => Nil
   }
@@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       partitionColumns: StructType,
       partitions: Array[Partition]) = {
     val output = projections.map(_.toAttribute)
-    val relation = logicalRelation.relation.asInstanceOf[FSBasedRelation]
+    val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
 
     // Builds RDD[Row]s for each selected partition.
     val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index d30f7f6..d1f0cda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -35,6 +35,10 @@ private[sql] case class Partition(values: Row, path: String)
 private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
 
 private[sql] object PartitioningUtils {
+  // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
+  // depend on Hive.
+  private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
+
   private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
     require(columnNames.size == literals.size)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 7879328..a09bb08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -58,8 +58,8 @@ private[sql] case class InsertIntoDataSource(
   }
 }
 
-private[sql] case class InsertIntoFSBasedRelation(
-    @transient relation: FSBasedRelation,
+private[sql] case class InsertIntoHadoopFsRelation(
+    @transient relation: HadoopFsRelation,
     @transient query: LogicalPlan,
     partitionColumns: Array[String],
     mode: SaveMode)
@@ -102,7 +102,7 @@ private[sql] case class InsertIntoFSBasedRelation(
         insert(new DefaultWriterContainer(relation, job), df)
       } else {
         val writerContainer = new DynamicPartitionWriterContainer(
-          relation, job, partitionColumns, "__HIVE_DEFAULT_PARTITION__")
+          relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
         insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
       }
     }
@@ -234,7 +234,7 @@ private[sql] case class InsertIntoFSBasedRelation(
 }
 
 private[sql] abstract class BaseWriterContainer(
-    @transient val relation: FSBasedRelation,
+    @transient val relation: HadoopFsRelation,
     @transient job: Job)
   extends SparkHadoopMapReduceUtil
   with Logging
@@ -261,7 +261,7 @@ private[sql] abstract class BaseWriterContainer(
 
   protected val dataSchema = relation.dataSchema
 
-  protected val outputWriterClass: Class[_ <: OutputWriter] = relation.outputWriterClass
+  protected var outputWriterFactory: OutputWriterFactory = _
 
   private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
 
@@ -269,7 +269,7 @@ private[sql] abstract class BaseWriterContainer(
     setupIDs(0, 0, 0)
     setupConf()
     taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
-    relation.prepareForWrite(job)
+    outputWriterFactory = relation.prepareJobForWrite(job)
     outputFormatClass = job.getOutputFormatClass
     outputCommitter = newOutputCommitter(taskAttemptContext)
     outputCommitter.setupJob(jobContext)
@@ -346,16 +346,15 @@ private[sql] abstract class BaseWriterContainer(
 }
 
 private[sql] class DefaultWriterContainer(
-    @transient relation: FSBasedRelation,
+    @transient relation: HadoopFsRelation,
     @transient job: Job)
   extends BaseWriterContainer(relation, job) {
 
   @transient private var writer: OutputWriter = _
 
   override protected def initWriters(): Unit = {
-    writer = outputWriterClass.newInstance()
     taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
-    writer.init(getWorkPath, dataSchema, taskAttemptContext)
+    writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
   }
 
   override def outputWriterForRow(row: Row): OutputWriter = writer
@@ -372,7 +371,7 @@ private[sql] class DefaultWriterContainer(
 }
 
 private[sql] class DynamicPartitionWriterContainer(
-    @transient relation: FSBasedRelation,
+    @transient relation: HadoopFsRelation,
     @transient job: Job,
     partitionColumns: Array[String],
     defaultPartitionName: String)
@@ -398,12 +397,10 @@ private[sql] class DynamicPartitionWriterContainer(
 
     outputWriters.getOrElseUpdate(partitionPath, {
       val path = new Path(getWorkPath, partitionPath)
-      val writer = outputWriterClass.newInstance()
       taskAttemptContext.getConfiguration.set(
         "spark.sql.sources.output.path",
         new Path(outputPath, partitionPath).toString)
-      writer.init(path.toString, dataSchema, taskAttemptContext)
-      writer
+      outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
     })
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 595c5eb..37a569d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -226,7 +226,7 @@ private[sql] object ResolvedDataSource {
       case Some(schema: StructType) => clazz.newInstance() match {
         case dataSource: SchemaRelationProvider =>
           dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
-        case dataSource: FSBasedRelationProvider =>
+        case dataSource: HadoopFsRelationProvider =>
           val maybePartitionsSchema = if (partitionColumns.isEmpty) {
             None
           } else {
@@ -256,7 +256,7 @@ private[sql] object ResolvedDataSource {
       case None => clazz.newInstance() match {
         case dataSource: RelationProvider =>
           dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
-        case dataSource: FSBasedRelationProvider =>
+        case dataSource: HadoopFsRelationProvider =>
           val caseInsensitiveOptions = new CaseInsensitiveMap(options)
           val paths = {
             val patternPath = new Path(caseInsensitiveOptions("path"))
@@ -296,7 +296,7 @@ private[sql] object ResolvedDataSource {
     val relation = clazz.newInstance() match {
       case dataSource: CreatableRelationProvider =>
         dataSource.createRelation(sqlContext, mode, options, data)
-      case dataSource: FSBasedRelationProvider =>
+      case dataSource: HadoopFsRelationProvider =>
         // Don't glob path for the write path.  The contracts here are:
         //  1. Only one output path can be specified on the write path;
         //  2. Output path must be a legal HDFS style file system path;
@@ -315,7 +315,7 @@ private[sql] object ResolvedDataSource {
           Some(partitionColumnsSchema(data.schema, partitionColumns)),
           caseInsensitiveOptions)
         sqlContext.executePlan(
-          InsertIntoFSBasedRelation(
+          InsertIntoHadoopFsRelation(
             r,
             data.logicalPlan,
             partitionColumns.toArray,

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6f31530..274ab44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, _}
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.types.{StructField, StructType}
@@ -94,7 +94,7 @@ trait SchemaRelationProvider {
  * ::DeveloperApi::
  * Implemented by objects that produce relations for a specific kind of data source
  * with a given schema and partitioned columns.  When Spark SQL is given a DDL operation with a
- * USING clause specified (to specify the implemented [[FSBasedRelationProvider]]), a user defined
+ * USING clause specified (to specify the implemented [[HadoopFsRelationProvider]]), a user defined
  * schema, and an optional list of partition columns, this interface is used to pass in the
  * parameters specified by a user.
  *
@@ -105,15 +105,15 @@ trait SchemaRelationProvider {
  *
  * A new instance of this class with be instantiated each time a DDL call is made.
  *
- * The difference between a [[RelationProvider]] and a [[FSBasedRelationProvider]] is
+ * The difference between a [[RelationProvider]] and a [[HadoopFsRelationProvider]] is
  * that users need to provide a schema and a (possibly empty) list of partition columns when
  * using a SchemaRelationProvider. A relation provider can inherits both [[RelationProvider]],
- * and [[FSBasedRelationProvider]] if it can support schema inference, user-specified
+ * and [[HadoopFsRelationProvider]] if it can support schema inference, user-specified
  * schemas, and accessing partitioned relations.
  *
  * @since 1.4.0
  */
-trait FSBasedRelationProvider {
+trait HadoopFsRelationProvider {
   /**
    * Returns a new base relation with the given parameters, a user defined schema, and a list of
    * partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
@@ -124,7 +124,7 @@ trait FSBasedRelationProvider {
       paths: Array[String],
       schema: Option[StructType],
       partitionColumns: Option[StructType],
-      parameters: Map[String, String]): FSBasedRelation
+      parameters: Map[String, String]): HadoopFsRelation
 }
 
 /**
@@ -280,33 +280,42 @@ trait CatalystScan {
 
 /**
  * ::Experimental::
- * [[OutputWriter]] is used together with [[FSBasedRelation]] for persisting rows to the
- * underlying file system.  Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
- * An [[OutputWriter]] instance is created and initialized when a new output file is opened on
- * executor side.  This instance is used to persist rows to this single output file.
+ * A factory that produces [[OutputWriter]]s.  A new [[OutputWriterFactory]] is created on driver
+ * side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized
+ * to executor side to create actual [[OutputWriter]]s on the fly.
  *
  * @since 1.4.0
  */
 @Experimental
-abstract class OutputWriter {
+abstract class OutputWriterFactory extends Serializable {
   /**
-   * Initializes this [[OutputWriter]] before any rows are persisted.
+   * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side
+   * to instantiate new [[OutputWriter]]s.
    *
    * @param path Path of the file to which this [[OutputWriter]] is supposed to write.  Note that
    *        this may not point to the final output file.  For example, `FileOutputFormat` writes to
    *        temporary directories and then merge written files back to the final destination.  In
    *        this case, `path` points to a temporary output file under the temporary directory.
    * @param dataSchema Schema of the rows to be written. Partition columns are not included in the
-   *        schema if the corresponding relation is partitioned.
+   *        schema if the relation being written is partitioned.
    * @param context The Hadoop MapReduce task context.
    *
    * @since 1.4.0
    */
-  def init(
-      path: String,
-      dataSchema: StructType,
-      context: TaskAttemptContext): Unit = ()
+  def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter
+}
 
+/**
+ * ::Experimental::
+ * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the
+ * underlying file system.  Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
+ * An [[OutputWriter]] instance is created and initialized when a new output file is opened on
+ * executor side.  This instance is used to persist rows to this single output file.
+ *
+ * @since 1.4.0
+ */
+@Experimental
+abstract class OutputWriter {
   /**
    * Persists a single row.  Invoked on the executor side.  When writing to dynamically partitioned
    * tables, dynamic partition columns are not included in rows to be written.
@@ -333,74 +342,71 @@ abstract class OutputWriter {
  * filter using selected predicates before producing an RDD containing all matching tuples as
  * [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file
  * systems, it's able to discover partitioning information from the paths of input directories, and
- * perform partition pruning before start reading the data. Subclasses of [[FSBasedRelation()]] must
- * override one of the three `buildScan` methods to implement the read path.
+ * perform partition pruning before start reading the data. Subclasses of [[HadoopFsRelation()]]
+ * must override one of the three `buildScan` methods to implement the read path.
  *
  * For the write path, it provides the ability to write to both non-partitioned and partitioned
  * tables.  Directory layout of the partitioned tables is compatible with Hive.
  *
  * @constructor This constructor is for internal uses only. The [[PartitionSpec]] argument is for
  *              implementing metastore table conversion.
- * @param paths Base paths of this relation.  For partitioned relations, it should be the root
- *        directories of all partition directories.
- * @param maybePartitionSpec An [[FSBasedRelation]] can be created with an optional
+ *
+ * @param maybePartitionSpec An [[HadoopFsRelation]] can be created with an optional
  *        [[PartitionSpec]], so that partition discovery can be skipped.
  *
  * @since 1.4.0
  */
 @Experimental
-abstract class FSBasedRelation private[sql](
-    val paths: Array[String],
-    maybePartitionSpec: Option[PartitionSpec])
+abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec])
   extends BaseRelation {
 
+  def this() = this(None)
+
+  private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+
+  private val codegenEnabled = sqlContext.conf.codegenEnabled
+
+  private var _partitionSpec: PartitionSpec = _
+
+  final private[sql] def partitionSpec: PartitionSpec = {
+    if (_partitionSpec == null) {
+      _partitionSpec = maybePartitionSpec
+        .map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
+        .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
+        .getOrElse {
+        if (sqlContext.conf.partitionDiscoveryEnabled()) {
+          discoverPartitions()
+        } else {
+          PartitionSpec(StructType(Nil), Array.empty[Partition])
+        }
+      }
+    }
+    _partitionSpec
+  }
+
   /**
-   * Constructs an [[FSBasedRelation]].
-   *
-   * @param paths Base paths of this relation.  For partitioned relations, it should be either root
-   *        directories of all partition directories.
-   * @param partitionColumns Partition columns of this relation.
+   * Base paths of this relation.  For partitioned relations, it should be either root directories
+   * of all partition directories.
    *
    * @since 1.4.0
    */
-  def this(paths: Array[String], partitionColumns: StructType) =
-    this(paths, {
-      if (partitionColumns.isEmpty) None
-      else Some(PartitionSpec(partitionColumns, Array.empty[Partition]))
-    })
+  def paths: Array[String]
 
   /**
-   * Constructs an [[FSBasedRelation]].
-   *
-   * @param paths Base paths of this relation.  For partitioned relations, it should be root
-   *        directories of all partition directories.
+   * Partition columns.  Can be either defined by [[userDefinedPartitionColumns]] or automatically
+   * discovered.  Note that they should always be nullable.
    *
    * @since 1.4.0
    */
-  def this(paths: Array[String]) = this(paths, None)
-
-  private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
-
-  private val codegenEnabled = sqlContext.conf.codegenEnabled
-
-  private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec =>
-    spec.copy(partitionColumns = spec.partitionColumns.asNullable)
-  }.getOrElse {
-    if (sqlContext.conf.partitionDiscoveryEnabled()) {
-      discoverPartitions()
-    } else {
-      PartitionSpec(StructType(Nil), Array.empty[Partition])
-    }
-  }
-
-  private[sql] def partitionSpec: PartitionSpec = _partitionSpec
+  final def partitionColumns: StructType =
+    userDefinedPartitionColumns.getOrElse(partitionSpec.partitionColumns)
 
   /**
-   * Partition columns. Note that they are always nullable.
+   * Optional user defined partition columns.
    *
    * @since 1.4.0
    */
-  def partitionColumns: StructType = partitionSpec.partitionColumns
+  def userDefinedPartitionColumns: Option[StructType] = None
 
   private[sql] def refresh(): Unit = {
     if (sqlContext.conf.partitionDiscoveryEnabled()) {
@@ -419,7 +425,7 @@ abstract class FSBasedRelation private[sql](
     }.map(_.getPath)
 
     if (leafDirs.nonEmpty) {
-      PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__")
+      PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
     } else {
       PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
     }
@@ -458,7 +464,7 @@ abstract class FSBasedRelation private[sql](
    * @since 1.4.0
    */
   def buildScan(inputPaths: Array[String]): RDD[Row] = {
-    throw new RuntimeException(
+    throw new UnsupportedOperationException(
       "At least one buildScan() method should be overridden to read the relation.")
   }
 
@@ -520,8 +526,8 @@ abstract class FSBasedRelation private[sql](
   }
 
   /**
-   * Client side preparation for data writing can be put here.  For example, user defined output
-   * committer can be configured here.
+   * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation can
+   * be put here.  For example, user defined output committer can be configured here.
    *
    * Note that the only side effect expected here is mutating `job` via its setters.  Especially,
    * Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states
@@ -529,13 +535,5 @@ abstract class FSBasedRelation private[sql](
    *
    * @since 1.4.0
    */
-  def prepareForWrite(job: Job): Unit = ()
-
-  /**
-   * This method is responsible for producing a new [[OutputWriter]] for each newly opened output
-   * file on the executor side.
-   *
-   * @since 1.4.0
-   */
-  def outputWriterClass: Class[_ <: OutputWriter]
+  def prepareJobForWrite(job: Job): OutputWriterFactory
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index aad1d24..1eacdde 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -102,7 +102,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
         }
 
       case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) => // OK
-      case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) => // OK
+      case logical.InsertIntoTable(LogicalRelation(_: HadoopFsRelation), _, _, _, _) => // OK
       case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
         // The relation in l is not an InsertableRelation.
         failAnalysis(s"$l does not allow insertion.")

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 3bbc5b0..5ad4395 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
         }.flatten.reduceOption(_ && _)
 
         val forParquetDataSource = query.queryExecution.optimizedPlan.collect {
-          case PhysicalOperation(_, filters, LogicalRelation(_: FSBasedParquetRelation)) => filters
+          case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters
         }.flatten.reduceOption(_ && _)
 
         forParquetTableScan.orElse(forParquetDataSource)

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index fc90e3e..c964b6d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -204,7 +204,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
         StructField("lowerCase", StringType),
         StructField("UPPERCase", DoubleType, nullable = false)))) {
 
-      FSBasedParquetRelation.mergeMetastoreParquetSchema(
+      ParquetRelation2.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("lowercase", StringType),
           StructField("uppercase", DoubleType, nullable = false))),
@@ -219,7 +219,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
       StructType(Seq(
         StructField("UPPERCase", DoubleType, nullable = false)))) {
 
-      FSBasedParquetRelation.mergeMetastoreParquetSchema(
+      ParquetRelation2.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("uppercase", DoubleType, nullable = false))),
 
@@ -230,7 +230,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
 
     // Metastore schema contains additional non-nullable fields.
     assert(intercept[Throwable] {
-      FSBasedParquetRelation.mergeMetastoreParquetSchema(
+      ParquetRelation2.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("uppercase", DoubleType, nullable = false),
           StructField("lowerCase", BinaryType, nullable = false))),
@@ -241,7 +241,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
 
     // Conflicting non-nullable field names
     intercept[Throwable] {
-      FSBasedParquetRelation.mergeMetastoreParquetSchema(
+      ParquetRelation2.mergeMetastoreParquetSchema(
         StructType(Seq(StructField("lower", StringType, nullable = false))),
         StructType(Seq(StructField("lowerCase", BinaryType))))
     }
@@ -255,7 +255,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
         StructField("firstField", StringType, nullable = true),
         StructField("secondField", StringType, nullable = true),
         StructField("thirdfield", StringType, nullable = true)))) {
-      FSBasedParquetRelation.mergeMetastoreParquetSchema(
+      ParquetRelation2.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("firstfield", StringType, nullable = true),
           StructField("secondfield", StringType, nullable = true),
@@ -268,7 +268,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
     // Merge should fail if the Metastore contains any additional fields that are not
     // nullable.
     assert(intercept[Throwable] {
-      FSBasedParquetRelation.mergeMetastoreParquetSchema(
+      ParquetRelation2.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("firstfield", StringType, nullable = true),
           StructField("secondfield", StringType, nullable = true),

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b0e82c8..2aa80b4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
@@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
     // serialize the Metastore schema to JSON and pass it as a data source option because of the
     // evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
     val parquetOptions = Map(
-      FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
-      FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
+      ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
+      ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
     val tableIdentifier =
       QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
 
@@ -238,7 +238,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
         partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
       cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => None // Cache miss
-        case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) =>
+        case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
           // If we have the same paths, same schema, and same partition spec,
           // we will use the cached Parquet Relation.
           val useCached =
@@ -281,7 +281,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
       val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
       val parquetRelation = cached.getOrElse {
         val created = LogicalRelation(
-          new FSBasedParquetRelation(
+          new ParquetRelation2(
             paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
         cachedDataSourceTables.put(tableIdentifier, created)
         created
@@ -294,7 +294,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
       val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
       val parquetRelation = cached.getOrElse {
         val created = LogicalRelation(
-          new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive))
+          new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive))
         cachedDataSourceTables.put(tableIdentifier, created)
         created
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 8e405e0..6609763 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -194,7 +194,7 @@ case class CreateMetastoreDataSourceAsSelect(
             sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
           val createdRelation = LogicalRelation(resolved.relation)
           EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
-            case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) =>
+            case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) =>
               if (l.relation != createdRelation.relation) {
                 val errorDescription =
                   s"Cannot append to table $tableName because the resolved relation does not " +

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index da5d203..1bf1c1b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -579,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
       )
 
       table("test_parquet_ctas").queryExecution.optimizedPlan match {
-        case LogicalRelation(p: FSBasedParquetRelation) => // OK
+        case LogicalRelation(p: ParquetRelation2) => // OK
         case _ =>
           fail(
             "test_parquet_ctas should be converted to " +
-            s"${classOf[FSBasedParquetRelation].getCanonicalName}")
+            s"${classOf[ParquetRelation2].getCanonicalName}")
       }
 
       // Clenup and reset confs.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org