You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/05/15 10:21:11 UTC

[1/2] spark git commit: [SPARK-7591] [SQL] Partitioning support API tweaks

Repository: spark
Updated Branches:
  refs/heads/master 94761485b -> fdf5bba35


http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 5c7152e..dfe73c6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation}
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
 
@@ -175,17 +175,17 @@ class SQLQuerySuite extends QueryTest {
     def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
       val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
       relation match {
-        case LogicalRelation(r: FSBasedParquetRelation) =>
+        case LogicalRelation(r: ParquetRelation2) =>
           if (!isDataSourceParquet) {
             fail(
               s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
-              s"${FSBasedParquetRelation.getClass.getCanonicalName}.")
+              s"${ParquetRelation2.getClass.getCanonicalName}.")
           }
 
         case r: MetastoreRelation =>
           if (isDataSourceParquet) {
             fail(
-              s"${FSBasedParquetRelation.getClass.getCanonicalName} is expected, but found " +
+              s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " +
               s"${classOf[MetastoreRelation].getCanonicalName}.")
           }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 41bcbe8..b6be09e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -26,8 +26,8 @@ import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
 import org.apache.spark.sql.hive.execution.HiveTableScan
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.{FSBasedParquetRelation, ParquetTableScan}
-import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoFSBasedRelation, LogicalRelation}
+import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
+import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
 import org.apache.spark.util.Utils
@@ -291,10 +291,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
     )
 
     table("test_parquet_ctas").queryExecution.optimizedPlan match {
-      case LogicalRelation(_: FSBasedParquetRelation) => // OK
+      case LogicalRelation(_: ParquetRelation2) => // OK
       case _ => fail(
         "test_parquet_ctas should be converted to " +
-          s"${classOf[FSBasedParquetRelation].getCanonicalName}")
+          s"${classOf[ParquetRelation2].getCanonicalName}")
     }
 
     sql("DROP TABLE IF EXISTS test_parquet_ctas")
@@ -315,9 +315,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
 
     val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
     df.queryExecution.executedPlan match {
-      case ExecutedCommand(InsertIntoFSBasedRelation(_: FSBasedParquetRelation, _, _, _)) => // OK
+      case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _, _)) => // OK
       case o => fail("test_insert_parquet should be converted to a " +
-        s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
+        s"${classOf[ParquetRelation2].getCanonicalName} and " +
         s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " +
         s"However, found a ${o.toString} ")
     }
@@ -345,9 +345,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
 
     val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
     df.queryExecution.executedPlan match {
-      case ExecutedCommand(InsertIntoFSBasedRelation(r: FSBasedParquetRelation, _, _, _)) => // OK
+      case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _, _)) => // OK
       case o => fail("test_insert_parquet should be converted to a " +
-        s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
+        s"${classOf[ParquetRelation2].getCanonicalName} and " +
         s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
         s"However, found a ${o.toString} ")
     }
@@ -378,7 +378,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
 
     assertResult(2) {
       analyzed.collect {
-        case r @ LogicalRelation(_: FSBasedParquetRelation) => r
+        case r @ LogicalRelation(_: ParquetRelation2) => r
       }.size
     }
 
@@ -390,7 +390,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
       // Converted test_parquet should be cached.
       catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
         case null => fail("Converted test_parquet should be cached in the cache.")
-        case logical @ LogicalRelation(parquetRelation: FSBasedParquetRelation) => // OK
+        case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
         case other =>
           fail(
             "The cached test_parquet should be a Parquet Relation. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 8801aba..29b2158 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -24,7 +24,7 @@ import com.google.common.base.Objects
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{NullWritable, Text}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
-import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
@@ -32,17 +32,16 @@ import org.apache.spark.sql.types.{DataType, StructField, StructType}
 import org.apache.spark.sql.{Row, SQLContext}
 
 /**
- * A simple example [[FSBasedRelationProvider]].
+ * A simple example [[HadoopFsRelationProvider]].
  */
-class SimpleTextSource extends FSBasedRelationProvider {
+class SimpleTextSource extends HadoopFsRelationProvider {
   override def createRelation(
       sqlContext: SQLContext,
       paths: Array[String],
       schema: Option[StructType],
       partitionColumns: Option[StructType],
-      parameters: Map[String, String]): FSBasedRelation = {
-    val partitionsSchema = partitionColumns.getOrElse(StructType(Array.empty[StructField]))
-    new SimpleTextRelation(paths, schema, partitionsSchema, parameters)(sqlContext)
+      parameters: Map[String, String]): HadoopFsRelation = {
+    new SimpleTextRelation(paths, schema, partitionColumns, parameters)(sqlContext)
   }
 }
 
@@ -59,38 +58,30 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
   }
 }
 
-class SimpleTextOutputWriter extends OutputWriter {
-  private var recordWriter: RecordWriter[NullWritable, Text] = _
-  private var taskAttemptContext: TaskAttemptContext = _
-
-  override def init(
-      path: String,
-      dataSchema: StructType,
-      context: TaskAttemptContext): Unit = {
-    recordWriter = new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
-    taskAttemptContext = context
-  }
+class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter {
+  private val recordWriter: RecordWriter[NullWritable, Text] =
+    new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
 
   override def write(row: Row): Unit = {
     val serialized = row.toSeq.map(_.toString).mkString(",")
     recordWriter.write(null, new Text(serialized))
   }
 
-  override def close(): Unit = recordWriter.close(taskAttemptContext)
+  override def close(): Unit = recordWriter.close(context)
 }
 
 /**
- * A simple example [[FSBasedRelation]], used for testing purposes.  Data are stored as comma
+ * A simple example [[HadoopFsRelation]], used for testing purposes.  Data are stored as comma
  * separated string lines.  When scanning data, schema must be explicitly provided via data source
  * option `"dataSchema"`.
  */
 class SimpleTextRelation(
-    paths: Array[String],
+    override val paths: Array[String],
     val maybeDataSchema: Option[StructType],
-    partitionsSchema: StructType,
+    override val userDefinedPartitionColumns: Option[StructType],
     parameters: Map[String, String])(
     @transient val sqlContext: SQLContext)
-  extends FSBasedRelation(paths, partitionsSchema) {
+  extends HadoopFsRelation {
 
   import sqlContext.sparkContext
 
@@ -110,9 +101,6 @@ class SimpleTextRelation(
   override def hashCode(): Int =
     Objects.hashCode(paths, maybeDataSchema, dataSchema)
 
-  override def outputWriterClass: Class[_ <: OutputWriter] =
-    classOf[SimpleTextOutputWriter]
-
   override def buildScan(inputPaths: Array[String]): RDD[Row] = {
     val fields = dataSchema.map(_.dataType)
 
@@ -122,4 +110,13 @@ class SimpleTextRelation(
       }: _*)
     }
   }
+
+  override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
+    override def newInstance(
+        path: String,
+        dataSchema: StructType,
+        context: TaskAttemptContext): OutputWriter = {
+      new SimpleTextOutputWriter(path, context)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
deleted file mode 100644
index 394833f..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
+++ /dev/null
@@ -1,564 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql._
-import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.parquet.ParquetTest
-import org.apache.spark.sql.types._
-
-// TODO Don't extend ParquetTest
-// This test suite extends ParquetTest for some convenient utility methods. These methods should be
-// moved to some more general places, maybe QueryTest.
-class FSBasedRelationTest extends QueryTest with ParquetTest {
-  override val sqlContext: SQLContext = TestHive
-
-  import sqlContext._
-  import sqlContext.implicits._
-
-  val dataSourceName = classOf[SimpleTextSource].getCanonicalName
-
-  val dataSchema =
-    StructType(
-      Seq(
-        StructField("a", IntegerType, nullable = false),
-        StructField("b", StringType, nullable = false)))
-
-  val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b")
-
-  val partitionedTestDF1 = (for {
-    i <- 1 to 3
-    p2 <- Seq("foo", "bar")
-  } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2")
-
-  val partitionedTestDF2 = (for {
-    i <- 1 to 3
-    p2 <- Seq("foo", "bar")
-  } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2")
-
-  val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2)
-
-  def checkQueries(df: DataFrame): Unit = {
-    // Selects everything
-    checkAnswer(
-      df,
-      for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2))
-
-    // Simple filtering and partition pruning
-    checkAnswer(
-      df.filter('a > 1 && 'p1 === 2),
-      for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, p2))
-
-    // Simple projection and filtering
-    checkAnswer(
-      df.filter('a > 1).select('b, 'a + 1),
-      for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield Row(s"val_$i", i + 1))
-
-    // Simple projection and partition pruning
-    checkAnswer(
-      df.filter('a > 1 && 'p1 < 2).select('b, 'p1),
-      for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1))
-
-    // Self-join
-    df.registerTempTable("t")
-    withTempTable("t") {
-      checkAnswer(
-        sql(
-          """SELECT l.a, r.b, l.p1, r.p2
-            |FROM t l JOIN t r
-            |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2
-          """.stripMargin),
-        for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2))
-    }
-  }
-
-  test("save()/load() - non-partitioned table - Overwrite") {
-    withTempPath { file =>
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Overwrite)
-
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Overwrite)
-
-      checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
-        testDF.collect())
-    }
-  }
-
-  test("save()/load() - non-partitioned table - Append") {
-    withTempPath { file =>
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Overwrite)
-
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Append)
-
-      checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)).orderBy("a"),
-        testDF.unionAll(testDF).orderBy("a").collect())
-    }
-  }
-
-  test("save()/load() - non-partitioned table - ErrorIfExists") {
-    withTempDir { file =>
-      intercept[RuntimeException] {
-        testDF.save(
-          path = file.getCanonicalPath,
-          source = dataSourceName,
-          mode = SaveMode.ErrorIfExists)
-      }
-    }
-  }
-
-  test("save()/load() - non-partitioned table - Ignore") {
-    withTempDir { file =>
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Ignore)
-
-      val path = new Path(file.getCanonicalPath)
-      val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-      assert(fs.listStatus(path).isEmpty)
-    }
-  }
-
-  test("save()/load() - partitioned table - simple queries") {
-    withTempPath { file =>
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.ErrorIfExists,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      checkQueries(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)))
-    }
-  }
-
-  test("save()/load() - partitioned table - Overwrite") {
-    withTempPath { file =>
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
-        partitionedTestDF.collect())
-    }
-  }
-
-  test("save()/load() - partitioned table - Append") {
-    withTempPath { file =>
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Append,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
-        partitionedTestDF.unionAll(partitionedTestDF).collect())
-    }
-  }
-
-  test("save()/load() - partitioned table - Append - new partition values") {
-    withTempPath { file =>
-      partitionedTestDF1.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      partitionedTestDF2.save(
-        source = dataSourceName,
-        mode = SaveMode.Append,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
-        partitionedTestDF.collect())
-    }
-  }
-
-  test("save()/load() - partitioned table - ErrorIfExists") {
-    withTempDir { file =>
-      intercept[RuntimeException] {
-        partitionedTestDF.save(
-          source = dataSourceName,
-          mode = SaveMode.ErrorIfExists,
-          options = Map("path" -> file.getCanonicalPath),
-          partitionColumns = Seq("p1", "p2"))
-      }
-    }
-  }
-
-  test("save()/load() - partitioned table - Ignore") {
-    withTempDir { file =>
-      partitionedTestDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Ignore)
-
-      val path = new Path(file.getCanonicalPath)
-      val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
-      assert(fs.listStatus(path).isEmpty)
-    }
-  }
-
-  def withTable(tableName: String)(f: => Unit): Unit = {
-    try f finally sql(s"DROP TABLE $tableName")
-  }
-
-  test("saveAsTable()/load() - non-partitioned table - Overwrite") {
-    testDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      Map("dataSchema" -> dataSchema.json))
-
-    withTable("t") {
-      checkAnswer(table("t"), testDF.collect())
-    }
-  }
-
-  test("saveAsTable()/load() - non-partitioned table - Append") {
-    testDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite)
-
-    testDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Append)
-
-    withTable("t") {
-      checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect())
-    }
-  }
-
-  test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") {
-    Seq.empty[(Int, String)].toDF().registerTempTable("t")
-
-    withTempTable("t") {
-      intercept[AnalysisException] {
-        testDF.saveAsTable(
-          tableName = "t",
-          source = dataSourceName,
-          mode = SaveMode.ErrorIfExists)
-      }
-    }
-  }
-
-  test("saveAsTable()/load() - non-partitioned table - Ignore") {
-    Seq.empty[(Int, String)].toDF().registerTempTable("t")
-
-    withTempTable("t") {
-      testDF.saveAsTable(
-        tableName = "t",
-        source = dataSourceName,
-        mode = SaveMode.Ignore)
-
-      assert(table("t").collect().isEmpty)
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - simple queries") {
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      Map("dataSchema" -> dataSchema.json))
-
-    withTable("t") {
-      checkQueries(table("t"))
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Overwrite") {
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    withTable("t") {
-      checkAnswer(table("t"), partitionedTestDF.collect())
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Append") {
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Append,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    withTable("t") {
-      checkAnswer(table("t"), partitionedTestDF.unionAll(partitionedTestDF).collect())
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Append - new partition values") {
-    partitionedTestDF1.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    partitionedTestDF2.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Append,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    withTable("t") {
-      checkAnswer(table("t"), partitionedTestDF.collect())
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") {
-    partitionedTestDF1.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    // Using only a subset of all partition columns
-    intercept[Throwable] {
-      partitionedTestDF2.saveAsTable(
-        tableName = "t",
-        source = dataSourceName,
-        mode = SaveMode.Append,
-        options = Map("dataSchema" -> dataSchema.json),
-        partitionColumns = Seq("p1"))
-    }
-
-    // Using different order of partition columns
-    intercept[Throwable] {
-      partitionedTestDF2.saveAsTable(
-        tableName = "t",
-        source = dataSourceName,
-        mode = SaveMode.Append,
-        options = Map("dataSchema" -> dataSchema.json),
-        partitionColumns = Seq("p2", "p1"))
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - ErrorIfExists") {
-    Seq.empty[(Int, String)].toDF().registerTempTable("t")
-
-    withTempTable("t") {
-      intercept[AnalysisException] {
-        partitionedTestDF.saveAsTable(
-          tableName = "t",
-          source = dataSourceName,
-          mode = SaveMode.ErrorIfExists,
-          options = Map("dataSchema" -> dataSchema.json),
-          partitionColumns = Seq("p1", "p2"))
-      }
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Ignore") {
-    Seq.empty[(Int, String)].toDF().registerTempTable("t")
-
-    withTempTable("t") {
-      partitionedTestDF.saveAsTable(
-        tableName = "t",
-        source = dataSourceName,
-        mode = SaveMode.Ignore,
-        options = Map("dataSchema" -> dataSchema.json),
-        partitionColumns = Seq("p1", "p2"))
-
-      assert(table("t").collect().isEmpty)
-    }
-  }
-
-  test("Hadoop style globbing") {
-    withTempPath { file =>
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      val df = load(
-        source = dataSourceName,
-        options = Map(
-          "path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
-          "dataSchema" -> dataSchema.json))
-
-      val expectedPaths = Set(
-        s"${file.getCanonicalFile}/p1=1/p2=foo",
-        s"${file.getCanonicalFile}/p1=2/p2=foo",
-        s"${file.getCanonicalFile}/p1=1/p2=bar",
-        s"${file.getCanonicalFile}/p1=2/p2=bar"
-      ).map { p =>
-        val path = new Path(p)
-        val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-        path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
-      }
-
-      val actualPaths = df.queryExecution.analyzed.collectFirst {
-        case LogicalRelation(relation: FSBasedRelation) =>
-          relation.paths.toSet
-      }.getOrElse {
-        fail("Expect an FSBasedRelation, but none could be found")
-      }
-
-      assert(actualPaths === expectedPaths)
-      checkAnswer(df, partitionedTestDF.collect())
-    }
-  }
-}
-
-class SimpleTextRelationSuite extends FSBasedRelationTest {
-  override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName
-
-  import sqlContext._
-
-  test("save()/load() - partitioned table - simple queries - partition columns in data") {
-    withTempDir { file =>
-      val basePath = new Path(file.getCanonicalPath)
-      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
-      val qualifiedBasePath = fs.makeQualified(basePath)
-
-      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
-        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
-        sparkContext
-          .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
-          .saveAsTextFile(partitionDir.toString)
-      }
-
-      val dataSchemaWithPartition =
-        StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
-
-      checkQueries(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchemaWithPartition.json)))
-    }
-  }
-}
-
-class FSBasedParquetRelationSuite extends FSBasedRelationTest {
-  override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName
-
-  import sqlContext._
-  import sqlContext.implicits._
-
-  test("save()/load() - partitioned table - simple queries - partition columns in data") {
-    withTempDir { file =>
-      val basePath = new Path(file.getCanonicalPath)
-      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
-      val qualifiedBasePath = fs.makeQualified(basePath)
-
-      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
-        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
-        sparkContext
-          .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
-          .toDF("a", "b", "p1")
-          .saveAsParquetFile(partitionDir.toString)
-      }
-
-      val dataSchemaWithPartition =
-        StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
-
-      checkQueries(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchemaWithPartition.json)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
new file mode 100644
index 0000000..cf6afd2
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.parquet.ParquetTest
+import org.apache.spark.sql.types._
+
+// TODO Don't extend ParquetTest
+// This test suite extends ParquetTest for some convenient utility methods. These methods should be
+// moved to some more general places, maybe QueryTest.
+class HadoopFsRelationTest extends QueryTest with ParquetTest {
+  override val sqlContext: SQLContext = TestHive
+
+  import sqlContext._
+  import sqlContext.implicits._
+
+  val dataSourceName = classOf[SimpleTextSource].getCanonicalName
+
+  val dataSchema =
+    StructType(
+      Seq(
+        StructField("a", IntegerType, nullable = false),
+        StructField("b", StringType, nullable = false)))
+
+  val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b")
+
+  val partitionedTestDF1 = (for {
+    i <- 1 to 3
+    p2 <- Seq("foo", "bar")
+  } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2")
+
+  val partitionedTestDF2 = (for {
+    i <- 1 to 3
+    p2 <- Seq("foo", "bar")
+  } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2")
+
+  val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2)
+
+  def checkQueries(df: DataFrame): Unit = {
+    // Selects everything
+    checkAnswer(
+      df,
+      for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2))
+
+    // Simple filtering and partition pruning
+    checkAnswer(
+      df.filter('a > 1 && 'p1 === 2),
+      for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, p2))
+
+    // Simple projection and filtering
+    checkAnswer(
+      df.filter('a > 1).select('b, 'a + 1),
+      for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield Row(s"val_$i", i + 1))
+
+    // Simple projection and partition pruning
+    checkAnswer(
+      df.filter('a > 1 && 'p1 < 2).select('b, 'p1),
+      for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1))
+
+    // Self-join
+    df.registerTempTable("t")
+    withTempTable("t") {
+      checkAnswer(
+        sql(
+          """SELECT l.a, r.b, l.p1, r.p2
+            |FROM t l JOIN t r
+            |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2
+          """.stripMargin),
+        for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2))
+    }
+  }
+
+  test("save()/load() - non-partitioned table - Overwrite") {
+    withTempPath { file =>
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Overwrite)
+
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Overwrite)
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        testDF.collect())
+    }
+  }
+
+  test("save()/load() - non-partitioned table - Append") {
+    withTempPath { file =>
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Overwrite)
+
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Append)
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)).orderBy("a"),
+        testDF.unionAll(testDF).orderBy("a").collect())
+    }
+  }
+
+  test("save()/load() - non-partitioned table - ErrorIfExists") {
+    withTempDir { file =>
+      intercept[RuntimeException] {
+        testDF.save(
+          path = file.getCanonicalPath,
+          source = dataSourceName,
+          mode = SaveMode.ErrorIfExists)
+      }
+    }
+  }
+
+  test("save()/load() - non-partitioned table - Ignore") {
+    withTempDir { file =>
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Ignore)
+
+      val path = new Path(file.getCanonicalPath)
+      val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+      assert(fs.listStatus(path).isEmpty)
+    }
+  }
+
+  test("save()/load() - partitioned table - simple queries") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.ErrorIfExists,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkQueries(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)))
+    }
+  }
+
+  test("save()/load() - partitioned table - Overwrite") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        partitionedTestDF.collect())
+    }
+  }
+
+  test("save()/load() - partitioned table - Append") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Append,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        partitionedTestDF.unionAll(partitionedTestDF).collect())
+    }
+  }
+
+  test("save()/load() - partitioned table - Append - new partition values") {
+    withTempPath { file =>
+      partitionedTestDF1.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      partitionedTestDF2.save(
+        source = dataSourceName,
+        mode = SaveMode.Append,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        partitionedTestDF.collect())
+    }
+  }
+
+  test("save()/load() - partitioned table - ErrorIfExists") {
+    withTempDir { file =>
+      intercept[RuntimeException] {
+        partitionedTestDF.save(
+          source = dataSourceName,
+          mode = SaveMode.ErrorIfExists,
+          options = Map("path" -> file.getCanonicalPath),
+          partitionColumns = Seq("p1", "p2"))
+      }
+    }
+  }
+
+  test("save()/load() - partitioned table - Ignore") {
+    withTempDir { file =>
+      partitionedTestDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Ignore)
+
+      val path = new Path(file.getCanonicalPath)
+      val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
+      assert(fs.listStatus(path).isEmpty)
+    }
+  }
+
+  def withTable(tableName: String)(f: => Unit): Unit = {
+    try f finally sql(s"DROP TABLE $tableName")
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - Overwrite") {
+    testDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      Map("dataSchema" -> dataSchema.json))
+
+    withTable("t") {
+      checkAnswer(table("t"), testDF.collect())
+    }
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - Append") {
+    testDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite)
+
+    testDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Append)
+
+    withTable("t") {
+      checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect())
+    }
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      intercept[AnalysisException] {
+        testDF.saveAsTable(
+          tableName = "t",
+          source = dataSourceName,
+          mode = SaveMode.ErrorIfExists)
+      }
+    }
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - Ignore") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      testDF.saveAsTable(
+        tableName = "t",
+        source = dataSourceName,
+        mode = SaveMode.Ignore)
+
+      assert(table("t").collect().isEmpty)
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - simple queries") {
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      Map("dataSchema" -> dataSchema.json))
+
+    withTable("t") {
+      checkQueries(table("t"))
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Overwrite") {
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    withTable("t") {
+      checkAnswer(table("t"), partitionedTestDF.collect())
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Append") {
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Append,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    withTable("t") {
+      checkAnswer(table("t"), partitionedTestDF.unionAll(partitionedTestDF).collect())
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Append - new partition values") {
+    partitionedTestDF1.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    partitionedTestDF2.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Append,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    withTable("t") {
+      checkAnswer(table("t"), partitionedTestDF.collect())
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") {
+    partitionedTestDF1.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    // Using only a subset of all partition columns
+    intercept[Throwable] {
+      partitionedTestDF2.saveAsTable(
+        tableName = "t",
+        source = dataSourceName,
+        mode = SaveMode.Append,
+        options = Map("dataSchema" -> dataSchema.json),
+        partitionColumns = Seq("p1"))
+    }
+
+    // Using different order of partition columns
+    intercept[Throwable] {
+      partitionedTestDF2.saveAsTable(
+        tableName = "t",
+        source = dataSourceName,
+        mode = SaveMode.Append,
+        options = Map("dataSchema" -> dataSchema.json),
+        partitionColumns = Seq("p2", "p1"))
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - ErrorIfExists") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      intercept[AnalysisException] {
+        partitionedTestDF.saveAsTable(
+          tableName = "t",
+          source = dataSourceName,
+          mode = SaveMode.ErrorIfExists,
+          options = Map("dataSchema" -> dataSchema.json),
+          partitionColumns = Seq("p1", "p2"))
+      }
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Ignore") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      partitionedTestDF.saveAsTable(
+        tableName = "t",
+        source = dataSourceName,
+        mode = SaveMode.Ignore,
+        options = Map("dataSchema" -> dataSchema.json),
+        partitionColumns = Seq("p1", "p2"))
+
+      assert(table("t").collect().isEmpty)
+    }
+  }
+
+  test("Hadoop style globbing") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      val df = load(
+        source = dataSourceName,
+        options = Map(
+          "path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
+          "dataSchema" -> dataSchema.json))
+
+      val expectedPaths = Set(
+        s"${file.getCanonicalFile}/p1=1/p2=foo",
+        s"${file.getCanonicalFile}/p1=2/p2=foo",
+        s"${file.getCanonicalFile}/p1=1/p2=bar",
+        s"${file.getCanonicalFile}/p1=2/p2=bar"
+      ).map { p =>
+        val path = new Path(p)
+        val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
+      }
+
+      val actualPaths = df.queryExecution.analyzed.collectFirst {
+        case LogicalRelation(relation: HadoopFsRelation) =>
+          relation.paths.toSet
+      }.getOrElse {
+        fail("Expect an FSBasedRelation, but none could be found")
+      }
+
+      assert(actualPaths === expectedPaths)
+      checkAnswer(df, partitionedTestDF.collect())
+    }
+  }
+}
+
+class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
+  override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName
+
+  import sqlContext._
+
+  test("save()/load() - partitioned table - simple queries - partition columns in data") {
+    withTempDir { file =>
+      val basePath = new Path(file.getCanonicalPath)
+      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+      val qualifiedBasePath = fs.makeQualified(basePath)
+
+      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+        sparkContext
+          .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
+          .saveAsTextFile(partitionDir.toString)
+      }
+
+      val dataSchemaWithPartition =
+        StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
+
+      checkQueries(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchemaWithPartition.json)))
+    }
+  }
+}
+
+class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
+  override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName
+
+  import sqlContext._
+  import sqlContext.implicits._
+
+  test("save()/load() - partitioned table - simple queries - partition columns in data") {
+    withTempDir { file =>
+      val basePath = new Path(file.getCanonicalPath)
+      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+      val qualifiedBasePath = fs.makeQualified(basePath)
+
+      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+        sparkContext
+          .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
+          .toDF("a", "b", "p1")
+          .saveAsParquetFile(partitionDir.toString)
+      }
+
+      val dataSchemaWithPartition =
+        StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
+
+      checkQueries(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchemaWithPartition.json)))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-7591] [SQL] Partitioning support API tweaks

Posted by li...@apache.org.
[SPARK-7591] [SQL] Partitioning support API tweaks

Please see [SPARK-7591] [1] for the details.

/cc rxin marmbrus yhuai

[1]: https://issues.apache.org/jira/browse/SPARK-7591

Author: Cheng Lian <li...@databricks.com>

Closes #6150 from liancheng/spark-7591 and squashes the following commits:

af422e7 [Cheng Lian] Addresses @rxin's comments
37d1738 [Cheng Lian] Fixes HadoopFsRelation partition columns initialization
2fc680a [Cheng Lian] Fixes Scala style issue
189ad23 [Cheng Lian] Removes HadoopFsRelation constructor arguments
522c24e [Cheng Lian] Adds OutputWriterFactory
047d40d [Cheng Lian] Renames FSBased* to HadoopFs*, also renamed FSBasedParquetRelation back to ParquetRelation2


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdf5bba3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdf5bba3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdf5bba3

Branch: refs/heads/master
Commit: fdf5bba35d201fe0de3901b4d47262c485c76569
Parents: 9476148
Author: Cheng Lian <li...@databricks.com>
Authored: Fri May 15 16:20:49 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Fri May 15 16:20:49 2015 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala |  14 +-
 .../spark/sql/parquet/fsBasedParquet.scala      | 565 ------------------
 .../apache/spark/sql/parquet/newParquet.scala   | 570 +++++++++++++++++++
 .../spark/sql/sources/DataSourceStrategy.scala  |  10 +-
 .../spark/sql/sources/PartitioningUtils.scala   |   4 +
 .../org/apache/spark/sql/sources/commands.scala |  23 +-
 .../org/apache/spark/sql/sources/ddl.scala      |   8 +-
 .../apache/spark/sql/sources/interfaces.scala   | 140 +++--
 .../org/apache/spark/sql/sources/rules.scala    |   2 +-
 .../spark/sql/parquet/ParquetFilterSuite.scala  |   2 +-
 .../spark/sql/parquet/ParquetSchemaSuite.scala  |  12 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  12 +-
 .../spark/sql/hive/execution/commands.scala     |   2 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   6 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   8 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |  20 +-
 .../spark/sql/sources/SimpleTextRelation.scala  |  47 +-
 .../sql/sources/fsBasedRelationSuites.scala     | 564 ------------------
 .../sql/sources/hadoopFsRelationSuites.scala    | 564 ++++++++++++++++++
 19 files changed, 1287 insertions(+), 1286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index b33a700..9fb355e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.ParserDialect
 import org.apache.spark.sql.execution.{Filter, _}
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
 import org.apache.spark.sql.json._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -610,7 +610,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     } else if (conf.parquetUseDataSourceApi) {
       val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
       baseRelationToDataFrame(
-        new FSBasedParquetRelation(
+        new ParquetRelation2(
           globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
     } else {
       DataFrame(this, parquet.ParquetRelation(
@@ -989,7 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   def jdbc(url: String, table: String): DataFrame = {
     jdbc(url, table, JDBCRelation.columnPartition(null), new Properties())
   }
-  
+
   /**
    * :: Experimental ::
    * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1002,7 +1002,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   def jdbc(url: String, table: String, properties: Properties): DataFrame = {
     jdbc(url, table, JDBCRelation.columnPartition(null), properties)
   }
-  
+
   /**
    * :: Experimental ::
    * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1020,7 +1020,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   @Experimental
   def jdbc(
       url: String,
-      table: String,  
+      table: String,
       columnName: String,
       lowerBound: Long,
       upperBound: Long,
@@ -1056,7 +1056,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     val parts = JDBCRelation.columnPartition(partitioning)
     jdbc(url, table, parts, properties)
   }
-  
+
   /**
    * :: Experimental ::
    * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1093,7 +1093,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     }
     jdbc(url, table, parts, properties)
   }
-  
+
   private def jdbc(
       url: String,
       table: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
deleted file mode 100644
index c83a9c3..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.util.{List => JList}
-
-import scala.collection.JavaConversions._
-import scala.util.Try
-
-import com.google.common.base.Objects
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import parquet.filter2.predicate.FilterApi
-import parquet.format.converter.ParquetMetadataConverter
-import parquet.hadoop._
-import parquet.hadoop.metadata.CompressionCodecName
-import parquet.hadoop.util.ContextUtil
-
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD._
-import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.sql.{Row, SQLConf, SQLContext}
-import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
-
-private[sql] class DefaultSource extends FSBasedRelationProvider {
-  override def createRelation(
-      sqlContext: SQLContext,
-      paths: Array[String],
-      schema: Option[StructType],
-      partitionColumns: Option[StructType],
-      parameters: Map[String, String]): FSBasedRelation = {
-    val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
-    new FSBasedParquetRelation(paths, schema, partitionSpec, parameters)(sqlContext)
-  }
-}
-
-// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
-private[sql] class ParquetOutputWriter extends OutputWriter {
-  private var recordWriter: RecordWriter[Void, Row] = _
-  private var taskAttemptContext: TaskAttemptContext = _
-
-  override def init(
-      path: String,
-      dataSchema: StructType,
-      context: TaskAttemptContext): Unit = {
-    val conf = context.getConfiguration
-    val outputFormat = {
-      // When appending new Parquet files to an existing Parquet file directory, to avoid
-      // overwriting existing data files, we need to find out the max task ID encoded in these data
-      // file names.
-      // TODO Make this snippet a utility function for other data source developers
-      val maxExistingTaskId = {
-        // Note that `path` may point to a temporary location.  Here we retrieve the real
-        // destination path from the configuration
-        val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
-        val fs = outputPath.getFileSystem(conf)
-
-        if (fs.exists(outputPath)) {
-          // Pattern used to match task ID in part file names, e.g.:
-          //
-          //   part-r-00001.gz.part
-          //          ^~~~~
-          val partFilePattern = """part-.-(\d{1,}).*""".r
-
-          fs.listStatus(outputPath).map(_.getPath.getName).map {
-            case partFilePattern(id) => id.toInt
-            case name if name.startsWith("_") => 0
-            case name if name.startsWith(".") => 0
-            case name => sys.error(
-              s"""Trying to write Parquet files to directory $outputPath,
-                 |but found items with illegal name "$name"
-               """.stripMargin.replace('\n', ' ').trim)
-          }.reduceOption(_ max _).getOrElse(0)
-        } else {
-          0
-        }
-      }
-
-      new ParquetOutputFormat[Row]() {
-        // Here we override `getDefaultWorkFile` for two reasons:
-        //
-        //  1. To allow appending.  We need to generate output file name based on the max available
-        //     task ID computed above.
-        //
-        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` uses
-        //     `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
-        //     partitions in the case of dynamic partitioning.
-        override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-          val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
-          new Path(path, f"part-r-$split%05d$extension")
-        }
-      }
-    }
-
-    recordWriter = outputFormat.getRecordWriter(context)
-    taskAttemptContext = context
-  }
-
-  override def write(row: Row): Unit = recordWriter.write(null, row)
-
-  override def close(): Unit = recordWriter.close(taskAttemptContext)
-}
-
-private[sql] class FSBasedParquetRelation(
-    paths: Array[String],
-    private val maybeDataSchema: Option[StructType],
-    private val maybePartitionSpec: Option[PartitionSpec],
-    parameters: Map[String, String])(
-    val sqlContext: SQLContext)
-  extends FSBasedRelation(paths, maybePartitionSpec)
-  with Logging {
-
-  // Should we merge schemas from all Parquet part-files?
-  private val shouldMergeSchemas =
-    parameters.getOrElse(FSBasedParquetRelation.MERGE_SCHEMA, "true").toBoolean
-
-  private val maybeMetastoreSchema = parameters
-    .get(FSBasedParquetRelation.METASTORE_SCHEMA)
-    .map(DataType.fromJson(_).asInstanceOf[StructType])
-
-  private val metadataCache = new MetadataCache
-  metadataCache.refresh()
-
-  override def equals(other: scala.Any): Boolean = other match {
-    case that: FSBasedParquetRelation =>
-      val schemaEquality = if (shouldMergeSchemas) {
-        this.shouldMergeSchemas == that.shouldMergeSchemas
-      } else {
-        this.dataSchema == that.dataSchema &&
-          this.schema == that.schema
-      }
-
-      this.paths.toSet == that.paths.toSet &&
-        schemaEquality &&
-        this.maybeDataSchema == that.maybeDataSchema &&
-        this.partitionColumns == that.partitionColumns
-
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    if (shouldMergeSchemas) {
-      Objects.hashCode(
-        Boolean.box(shouldMergeSchemas),
-        paths.toSet,
-        maybeDataSchema,
-        maybePartitionSpec)
-    } else {
-      Objects.hashCode(
-        Boolean.box(shouldMergeSchemas),
-        paths.toSet,
-        dataSchema,
-        schema,
-        maybeDataSchema,
-        maybePartitionSpec)
-    }
-  }
-
-  override def outputWriterClass: Class[_ <: OutputWriter] = classOf[ParquetOutputWriter]
-
-  override def dataSchema: StructType = metadataCache.dataSchema
-
-  override private[sql] def refresh(): Unit = {
-    metadataCache.refresh()
-    super.refresh()
-  }
-
-  // Parquet data source always uses Catalyst internal representations.
-  override val needConversion: Boolean = false
-
-  override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
-
-  override def prepareForWrite(job: Job): Unit = {
-    val conf = ContextUtil.getConfiguration(job)
-
-    val committerClass =
-      conf.getClass(
-        "spark.sql.parquet.output.committer.class",
-        classOf[ParquetOutputCommitter],
-        classOf[ParquetOutputCommitter])
-
-    conf.setClass(
-      "mapred.output.committer.class",
-      committerClass,
-      classOf[ParquetOutputCommitter])
-
-    // TODO There's no need to use two kinds of WriteSupport
-    // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
-    // complex types.
-    val writeSupportClass =
-      if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
-        classOf[MutableRowWriteSupport]
-      } else {
-        classOf[RowWriteSupport]
-      }
-
-    ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
-    RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
-
-    // Sets compression scheme
-    conf.set(
-      ParquetOutputFormat.COMPRESSION,
-      ParquetRelation
-        .shortParquetCompressionCodecNames
-        .getOrElse(
-          sqlContext.conf.parquetCompressionCodec.toUpperCase,
-          CompressionCodecName.UNCOMPRESSED).name())
-  }
-
-  override def buildScan(
-      requiredColumns: Array[String],
-      filters: Array[Filter],
-      inputPaths: Array[String]): RDD[Row] = {
-
-    val job = new Job(SparkHadoopUtil.get.conf)
-    val conf = ContextUtil.getConfiguration(job)
-
-    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
-
-    if (inputPaths.nonEmpty) {
-      FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
-    }
-
-    // Try to push down filters when filter push-down is enabled.
-    if (sqlContext.conf.parquetFilterPushDown) {
-      filters
-        // Collects all converted Parquet filter predicates. Notice that not all predicates can be
-        // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-        // is used here.
-        .flatMap(ParquetFilters.createFilter(dataSchema, _))
-        .reduceOption(FilterApi.and)
-        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
-    }
-
-    conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
-      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
-      ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
-    })
-
-    conf.set(
-      RowWriteSupport.SPARK_ROW_SCHEMA,
-      ParquetTypesConverter.convertToString(dataSchema.toAttributes))
-
-    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
-    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
-    conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
-
-    val inputFileStatuses =
-      metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
-
-    val footers = inputFileStatuses.map(metadataCache.footers)
-
-    // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
-    // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
-    // footers.  Especially when a global arbitrative schema (either from metastore or data source
-    // DDL) is available.
-    new NewHadoopRDD(
-      sqlContext.sparkContext,
-      classOf[FilteringParquetRowInputFormat],
-      classOf[Void],
-      classOf[Row],
-      conf) {
-
-      val cacheMetadata = useMetadataCache
-
-      @transient val cachedStatuses = inputFileStatuses.map { f =>
-        // In order to encode the authority of a Path containing special characters such as /,
-        // we need to use the string returned by the URI of the path to create a new Path.
-        val pathWithAuthority = new Path(f.getPath.toUri.toString)
-
-        new FileStatus(
-          f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
-          f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
-      }.toSeq
-
-      @transient val cachedFooters = footers.map { f =>
-        // In order to encode the authority of a Path containing special characters such as /,
-        // we need to use the string returned by the URI of the path to create a new Path.
-        new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
-      }.toSeq
-
-      // Overridden so we can inject our own cached files statuses.
-      override def getPartitions: Array[SparkPartition] = {
-        val inputFormat = if (cacheMetadata) {
-          new FilteringParquetRowInputFormat {
-            override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
-
-            override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
-          }
-        } else {
-          new FilteringParquetRowInputFormat
-        }
-
-        val jobContext = newJobContext(getConf, jobId)
-        val rawSplits = inputFormat.getSplits(jobContext)
-
-        Array.tabulate[SparkPartition](rawSplits.size) { i =>
-          new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
-        }
-      }
-    }.values
-  }
-
-  private class MetadataCache {
-    // `FileStatus` objects of all "_metadata" files.
-    private var metadataStatuses: Array[FileStatus] = _
-
-    // `FileStatus` objects of all "_common_metadata" files.
-    private var commonMetadataStatuses: Array[FileStatus] = _
-
-    // Parquet footer cache.
-    var footers: Map[FileStatus, Footer] = _
-
-    // `FileStatus` objects of all data files (Parquet part-files).
-    var dataStatuses: Array[FileStatus] = _
-
-    // Schema of the actual Parquet files, without partition columns discovered from partition
-    // directory paths.
-    var dataSchema: StructType = _
-
-    // Schema of the whole table, including partition columns.
-    var schema: StructType = _
-
-    /**
-     * Refreshes `FileStatus`es, footers, partition spec, and table schema.
-     */
-    def refresh(): Unit = {
-      // Support either reading a collection of raw Parquet part-files, or a collection of folders
-      // containing Parquet files (e.g. partitioned Parquet table).
-      val baseStatuses = paths.distinct.flatMap { p =>
-        val path = new Path(p)
-        val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-        val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        Try(fs.getFileStatus(qualified)).toOption
-      }
-      assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
-
-      // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
-      val leaves = baseStatuses.flatMap { f =>
-        val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
-        SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
-          isSummaryFile(f.getPath) ||
-            !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
-        }
-      }
-
-      dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
-      metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
-      commonMetadataStatuses =
-        leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
-
-      footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
-        val parquetMetadata = ParquetFileReader.readFooter(
-          SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
-        f -> new Footer(f.getPath, parquetMetadata)
-      }.seq.toMap
-
-      dataSchema = {
-        val dataSchema0 =
-          maybeDataSchema
-            .orElse(readSchema())
-            .orElse(maybeMetastoreSchema)
-            .getOrElse(sys.error("Failed to get the schema."))
-
-        // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
-        // case insensitivity issue and possible schema mismatch (probably caused by schema
-        // evolution).
-        maybeMetastoreSchema
-          .map(FSBasedParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
-          .getOrElse(dataSchema0)
-      }
-    }
-
-    private def isSummaryFile(file: Path): Boolean = {
-      file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
-        file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
-    }
-
-    private def readSchema(): Option[StructType] = {
-      // Sees which file(s) we need to touch in order to figure out the schema.
-      //
-      // Always tries the summary files first if users don't require a merged schema.  In this case,
-      // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
-      // groups information, and could be much smaller for large Parquet files with lots of row
-      // groups.
-      //
-      // NOTE: Metadata stored in the summary files are merged from all part-files.  However, for
-      // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
-      // how to merge them correctly if some key is associated with different values in different
-      // part-files.  When this happens, Parquet simply gives up generating the summary file.  This
-      // implies that if a summary file presents, then:
-      //
-      //   1. Either all part-files have exactly the same Spark SQL schema, or
-      //   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
-      //      their schemas may differ from each other).
-      //
-      // Here we tend to be pessimistic and take the second case into account.  Basically this means
-      // we can't trust the summary files if users require a merged schema, and must touch all part-
-      // files to do the merge.
-      val filesToTouch =
-        if (shouldMergeSchemas) {
-          // Also includes summary files, 'cause there might be empty partition directories.
-          (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
-        } else {
-          // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
-          // don't have this.
-          commonMetadataStatuses.headOption
-            // Falls back to "_metadata"
-            .orElse(metadataStatuses.headOption)
-            // Summary file(s) not found, the Parquet file is either corrupted, or different part-
-            // files contain conflicting user defined metadata (two or more values are associated
-            // with a same key in different files).  In either case, we fall back to any of the
-            // first part-file, and just assume all schemas are consistent.
-            .orElse(dataStatuses.headOption)
-            .toSeq
-        }
-
-      assert(
-        filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
-        "No schema defined, " +
-          s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
-
-      FSBasedParquetRelation.readSchema(filesToTouch.map(footers.apply), sqlContext)
-    }
-  }
-}
-
-private[sql] object FSBasedParquetRelation extends Logging {
-  // Whether we should merge schemas collected from all Parquet part-files.
-  private[sql] val MERGE_SCHEMA = "mergeSchema"
-
-  // Hive Metastore schema, used when converting Metastore Parquet tables.  This option is only used
-  // internally.
-  private[sql] val METASTORE_SCHEMA = "metastoreSchema"
-
-  private[parquet] def readSchema(
-      footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
-    footers.map { footer =>
-      val metadata = footer.getParquetMetadata.getFileMetaData
-      val parquetSchema = metadata.getSchema
-      val maybeSparkSchema = metadata
-        .getKeyValueMetaData
-        .toMap
-        .get(RowReadSupport.SPARK_METADATA_KEY)
-        .flatMap { serializedSchema =>
-          // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
-          // whatever is available.
-          Try(DataType.fromJson(serializedSchema))
-            .recover { case _: Throwable =>
-              logInfo(
-                s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
-                  "falling back to the deprecated DataType.fromCaseClassString parser.")
-              DataType.fromCaseClassString(serializedSchema)
-            }
-            .recover { case cause: Throwable =>
-              logWarning(
-                s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
-                   |\t$serializedSchema
-                 """.stripMargin,
-                cause)
-            }
-            .map(_.asInstanceOf[StructType])
-            .toOption
-        }
-
-      maybeSparkSchema.getOrElse {
-        // Falls back to Parquet schema if Spark SQL schema is absent.
-        StructType.fromAttributes(
-          // TODO Really no need to use `Attribute` here, we only need to know the data type.
-          ParquetTypesConverter.convertToAttributes(
-            parquetSchema,
-            sqlContext.conf.isParquetBinaryAsString,
-            sqlContext.conf.isParquetINT96AsTimestamp))
-      }
-    }.reduceOption { (left, right) =>
-      try left.merge(right) catch { case e: Throwable =>
-        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
-      }
-    }
-  }
-
-  /**
-   * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
-   * schema and Parquet schema.
-   *
-   * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
-   * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
-   * distinguish binary and string).  This method generates a correct schema by merging Metastore
-   * schema data types and Parquet schema field names.
-   */
-  private[parquet] def mergeMetastoreParquetSchema(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    def schemaConflictMessage: String =
-      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
-         |${metastoreSchema.prettyJson}
-         |
-         |Parquet schema:
-         |${parquetSchema.prettyJson}
-       """.stripMargin
-
-    val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
-
-    assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
-
-    val ordinalMap = metastoreSchema.zipWithIndex.map {
-      case (field, index) => field.name.toLowerCase -> index
-    }.toMap
-
-    val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
-      ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
-
-    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
-      // Uses Parquet field names but retains Metastore data types.
-      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
-        mSchema.copy(name = pSchema.name)
-      case _ =>
-        throw new SparkException(schemaConflictMessage)
-    })
-  }
-
-  /**
-   * Returns the original schema from the Parquet file with any missing nullable fields from the
-   * Hive Metastore schema merged in.
-   *
-   * When constructing a DataFrame from a collection of structured data, the resulting object has
-   * a schema corresponding to the union of the fields present in each element of the collection.
-   * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
-   * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
-   * contain a particular nullable field in its schema despite that field being present in the
-   * table schema obtained from the Hive Metastore. This method returns a schema representing the
-   * Parquet file schema along with any additional nullable fields from the Metastore schema
-   * merged in.
-   */
-  private[parquet] def mergeMissingNullableFields(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
-    val missingFields = metastoreSchema
-      .map(_.name.toLowerCase)
-      .diff(parquetSchema.map(_.name.toLowerCase))
-      .map(fieldMap(_))
-      .filter(_.nullable)
-    StructType(parquetSchema ++ missingFields)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
new file mode 100644
index 0000000..946062f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+import scala.util.Try
+
+import com.google.common.base.Objects
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import parquet.filter2.predicate.FilterApi
+import parquet.format.converter.ParquetMetadataConverter
+import parquet.hadoop._
+import parquet.hadoop.metadata.CompressionCodecName
+import parquet.hadoop.util.ContextUtil
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD._
+import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.{Row, SQLConf, SQLContext}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
+
+private[sql] class DefaultSource extends HadoopFsRelationProvider {
+  override def createRelation(
+      sqlContext: SQLContext,
+      paths: Array[String],
+      schema: Option[StructType],
+      partitionColumns: Option[StructType],
+      parameters: Map[String, String]): HadoopFsRelation = {
+    val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
+    new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext)
+  }
+}
+
+// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
+private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
+  extends OutputWriter {
+
+  private val recordWriter: RecordWriter[Void, Row] = {
+    val conf = context.getConfiguration
+    val outputFormat = {
+      // When appending new Parquet files to an existing Parquet file directory, to avoid
+      // overwriting existing data files, we need to find out the max task ID encoded in these data
+      // file names.
+      // TODO Make this snippet a utility function for other data source developers
+      val maxExistingTaskId = {
+        // Note that `path` may point to a temporary location.  Here we retrieve the real
+        // destination path from the configuration
+        val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
+        val fs = outputPath.getFileSystem(conf)
+
+        if (fs.exists(outputPath)) {
+          // Pattern used to match task ID in part file names, e.g.:
+          //
+          //   part-r-00001.gz.parquet
+          //          ^~~~~
+          val partFilePattern = """part-.-(\d{1,}).*""".r
+
+          fs.listStatus(outputPath).map(_.getPath.getName).map {
+            case partFilePattern(id) => id.toInt
+            case name if name.startsWith("_") => 0
+            case name if name.startsWith(".") => 0
+            case name => sys.error(
+              s"Trying to write Parquet files to directory $outputPath, " +
+                s"but found items with illegal name '$name'.")
+          }.reduceOption(_ max _).getOrElse(0)
+        } else {
+          0
+        }
+      }
+
+      new ParquetOutputFormat[Row]() {
+        // Here we override `getDefaultWorkFile` for two reasons:
+        //
+        //  1. To allow appending.  We need to generate output file name based on the max available
+        //     task ID computed above.
+        //
+        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` uses
+        //     `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
+        //     partitions in the case of dynamic partitioning.
+        override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+          val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
+          new Path(path, f"part-r-$split%05d$extension")
+        }
+      }
+    }
+
+    outputFormat.getRecordWriter(context)
+  }
+
+  override def write(row: Row): Unit = recordWriter.write(null, row)
+
+  override def close(): Unit = recordWriter.close(context)
+}
+
+private[sql] class ParquetRelation2(
+    override val paths: Array[String],
+    private val maybeDataSchema: Option[StructType],
+    private val maybePartitionSpec: Option[PartitionSpec],
+    parameters: Map[String, String])(
+    val sqlContext: SQLContext)
+  extends HadoopFsRelation(maybePartitionSpec)
+  with Logging {
+
+  // Should we merge schemas from all Parquet part-files?
+  private val shouldMergeSchemas =
+    parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
+
+  private val maybeMetastoreSchema = parameters
+    .get(ParquetRelation2.METASTORE_SCHEMA)
+    .map(DataType.fromJson(_).asInstanceOf[StructType])
+
+  private lazy val metadataCache: MetadataCache = {
+    val meta = new MetadataCache
+    meta.refresh()
+    meta
+  }
+
+  override def equals(other: scala.Any): Boolean = other match {
+    case that: ParquetRelation2 =>
+      val schemaEquality = if (shouldMergeSchemas) {
+        this.shouldMergeSchemas == that.shouldMergeSchemas
+      } else {
+        this.dataSchema == that.dataSchema &&
+          this.schema == that.schema
+      }
+
+      this.paths.toSet == that.paths.toSet &&
+        schemaEquality &&
+        this.maybeDataSchema == that.maybeDataSchema &&
+        this.partitionColumns == that.partitionColumns
+
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    if (shouldMergeSchemas) {
+      Objects.hashCode(
+        Boolean.box(shouldMergeSchemas),
+        paths.toSet,
+        maybeDataSchema,
+        maybePartitionSpec)
+    } else {
+      Objects.hashCode(
+        Boolean.box(shouldMergeSchemas),
+        paths.toSet,
+        dataSchema,
+        schema,
+        maybeDataSchema,
+        maybePartitionSpec)
+    }
+  }
+
+  override def dataSchema: StructType = metadataCache.dataSchema
+
+  override private[sql] def refresh(): Unit = {
+    metadataCache.refresh()
+    super.refresh()
+  }
+
+  // Parquet data source always uses Catalyst internal representations.
+  override val needConversion: Boolean = false
+
+  override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
+
+  override def userDefinedPartitionColumns: Option[StructType] =
+    maybePartitionSpec.map(_.partitionColumns)
+
+  override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+    val conf = ContextUtil.getConfiguration(job)
+
+    val committerClass =
+      conf.getClass(
+        "spark.sql.parquet.output.committer.class",
+        classOf[ParquetOutputCommitter],
+        classOf[ParquetOutputCommitter])
+
+    conf.setClass(
+      "mapred.output.committer.class",
+      committerClass,
+      classOf[ParquetOutputCommitter])
+
+    // TODO There's no need to use two kinds of WriteSupport
+    // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
+    // complex types.
+    val writeSupportClass =
+      if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
+        classOf[MutableRowWriteSupport]
+      } else {
+        classOf[RowWriteSupport]
+      }
+
+    ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
+    RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
+
+    // Sets compression scheme
+    conf.set(
+      ParquetOutputFormat.COMPRESSION,
+      ParquetRelation
+        .shortParquetCompressionCodecNames
+        .getOrElse(
+          sqlContext.conf.parquetCompressionCodec.toUpperCase,
+          CompressionCodecName.UNCOMPRESSED).name())
+
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = {
+        new ParquetOutputWriter(path, context)
+      }
+    }
+  }
+
+  override def buildScan(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      inputPaths: Array[String]): RDD[Row] = {
+
+    val job = new Job(SparkHadoopUtil.get.conf)
+    val conf = ContextUtil.getConfiguration(job)
+
+    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
+
+    if (inputPaths.nonEmpty) {
+      FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
+    }
+
+    // Try to push down filters when filter push-down is enabled.
+    if (sqlContext.conf.parquetFilterPushDown) {
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+        // is used here.
+        .flatMap(ParquetFilters.createFilter(dataSchema, _))
+        .reduceOption(FilterApi.and)
+        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
+    }
+
+    conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
+      ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
+    })
+
+    conf.set(
+      RowWriteSupport.SPARK_ROW_SCHEMA,
+      ParquetTypesConverter.convertToString(dataSchema.toAttributes))
+
+    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
+    conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
+
+    val inputFileStatuses =
+      metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
+
+    val footers = inputFileStatuses.map(metadataCache.footers)
+
+    // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
+    // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
+    // footers.  Especially when a global arbitrative schema (either from metastore or data source
+    // DDL) is available.
+    new NewHadoopRDD(
+      sqlContext.sparkContext,
+      classOf[FilteringParquetRowInputFormat],
+      classOf[Void],
+      classOf[Row],
+      conf) {
+
+      val cacheMetadata = useMetadataCache
+
+      @transient val cachedStatuses = inputFileStatuses.map { f =>
+        // In order to encode the authority of a Path containing special characters such as /,
+        // we need to use the string returned by the URI of the path to create a new Path.
+        val pathWithAuthority = new Path(f.getPath.toUri.toString)
+
+        new FileStatus(
+          f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
+          f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
+      }.toSeq
+
+      @transient val cachedFooters = footers.map { f =>
+        // In order to encode the authority of a Path containing special characters such as /,
+        // we need to use the string returned by the URI of the path to create a new Path.
+        new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
+      }.toSeq
+
+      // Overridden so we can inject our own cached files statuses.
+      override def getPartitions: Array[SparkPartition] = {
+        val inputFormat = if (cacheMetadata) {
+          new FilteringParquetRowInputFormat {
+            override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
+
+            override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
+          }
+        } else {
+          new FilteringParquetRowInputFormat
+        }
+
+        val jobContext = newJobContext(getConf, jobId)
+        val rawSplits = inputFormat.getSplits(jobContext)
+
+        Array.tabulate[SparkPartition](rawSplits.size) { i =>
+          new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+        }
+      }
+    }.values
+  }
+
+  private class MetadataCache {
+    // `FileStatus` objects of all "_metadata" files.
+    private var metadataStatuses: Array[FileStatus] = _
+
+    // `FileStatus` objects of all "_common_metadata" files.
+    private var commonMetadataStatuses: Array[FileStatus] = _
+
+    // Parquet footer cache.
+    var footers: Map[FileStatus, Footer] = _
+
+    // `FileStatus` objects of all data files (Parquet part-files).
+    var dataStatuses: Array[FileStatus] = _
+
+    // Schema of the actual Parquet files, without partition columns discovered from partition
+    // directory paths.
+    var dataSchema: StructType = _
+
+    // Schema of the whole table, including partition columns.
+    var schema: StructType = _
+
+    /**
+     * Refreshes `FileStatus`es, footers, partition spec, and table schema.
+     */
+    def refresh(): Unit = {
+      // Support either reading a collection of raw Parquet part-files, or a collection of folders
+      // containing Parquet files (e.g. partitioned Parquet table).
+      val baseStatuses = paths.distinct.flatMap { p =>
+        val path = new Path(p)
+        val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+        Try(fs.getFileStatus(qualified)).toOption
+      }
+      assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
+
+      // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
+      val leaves = baseStatuses.flatMap { f =>
+        val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
+        SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
+          isSummaryFile(f.getPath) ||
+            !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+        }
+      }
+
+      dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
+      metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
+      commonMetadataStatuses =
+        leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
+
+      footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
+        val parquetMetadata = ParquetFileReader.readFooter(
+          SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
+        f -> new Footer(f.getPath, parquetMetadata)
+      }.seq.toMap
+
+      dataSchema = {
+        val dataSchema0 =
+          maybeDataSchema
+            .orElse(readSchema())
+            .orElse(maybeMetastoreSchema)
+            .getOrElse(sys.error("Failed to get the schema."))
+
+        // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
+        // case insensitivity issue and possible schema mismatch (probably caused by schema
+        // evolution).
+        maybeMetastoreSchema
+          .map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
+          .getOrElse(dataSchema0)
+      }
+    }
+
+    private def isSummaryFile(file: Path): Boolean = {
+      file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
+        file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
+    }
+
+    private def readSchema(): Option[StructType] = {
+      // Sees which file(s) we need to touch in order to figure out the schema.
+      //
+      // Always tries the summary files first if users don't require a merged schema.  In this case,
+      // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
+      // groups information, and could be much smaller for large Parquet files with lots of row
+      // groups.
+      //
+      // NOTE: Metadata stored in the summary files are merged from all part-files.  However, for
+      // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
+      // how to merge them correctly if some key is associated with different values in different
+      // part-files.  When this happens, Parquet simply gives up generating the summary file.  This
+      // implies that if a summary file presents, then:
+      //
+      //   1. Either all part-files have exactly the same Spark SQL schema, or
+      //   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
+      //      their schemas may differ from each other).
+      //
+      // Here we tend to be pessimistic and take the second case into account.  Basically this means
+      // we can't trust the summary files if users require a merged schema, and must touch all part-
+      // files to do the merge.
+      val filesToTouch =
+        if (shouldMergeSchemas) {
+          // Also includes summary files, 'cause there might be empty partition directories.
+          (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
+        } else {
+          // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
+          // don't have this.
+          commonMetadataStatuses.headOption
+            // Falls back to "_metadata"
+            .orElse(metadataStatuses.headOption)
+            // Summary file(s) not found, the Parquet file is either corrupted, or different part-
+            // files contain conflicting user defined metadata (two or more values are associated
+            // with a same key in different files).  In either case, we fall back to any of the
+            // first part-file, and just assume all schemas are consistent.
+            .orElse(dataStatuses.headOption)
+            .toSeq
+        }
+
+      assert(
+        filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
+        "No schema defined, " +
+          s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
+
+      ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
+    }
+  }
+}
+
+private[sql] object ParquetRelation2 extends Logging {
+  // Whether we should merge schemas collected from all Parquet part-files.
+  private[sql] val MERGE_SCHEMA = "mergeSchema"
+
+  // Hive Metastore schema, used when converting Metastore Parquet tables.  This option is only used
+  // internally.
+  private[sql] val METASTORE_SCHEMA = "metastoreSchema"
+
+  private[parquet] def readSchema(
+      footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
+    footers.map { footer =>
+      val metadata = footer.getParquetMetadata.getFileMetaData
+      val parquetSchema = metadata.getSchema
+      val maybeSparkSchema = metadata
+        .getKeyValueMetaData
+        .toMap
+        .get(RowReadSupport.SPARK_METADATA_KEY)
+        .flatMap { serializedSchema =>
+          // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
+          // whatever is available.
+          Try(DataType.fromJson(serializedSchema))
+            .recover { case _: Throwable =>
+              logInfo(
+                s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
+                  "falling back to the deprecated DataType.fromCaseClassString parser.")
+              DataType.fromCaseClassString(serializedSchema)
+            }
+            .recover { case cause: Throwable =>
+              logWarning(
+                s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
+                   |\t$serializedSchema
+                 """.stripMargin,
+                cause)
+            }
+            .map(_.asInstanceOf[StructType])
+            .toOption
+        }
+
+      maybeSparkSchema.getOrElse {
+        // Falls back to Parquet schema if Spark SQL schema is absent.
+        StructType.fromAttributes(
+          // TODO Really no need to use `Attribute` here, we only need to know the data type.
+          ParquetTypesConverter.convertToAttributes(
+            parquetSchema,
+            sqlContext.conf.isParquetBinaryAsString,
+            sqlContext.conf.isParquetINT96AsTimestamp))
+      }
+    }.reduceOption { (left, right) =>
+      try left.merge(right) catch { case e: Throwable =>
+        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
+      }
+    }
+  }
+
+  /**
+   * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
+   * schema and Parquet schema.
+   *
+   * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
+   * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
+   * distinguish binary and string).  This method generates a correct schema by merging Metastore
+   * schema data types and Parquet schema field names.
+   */
+  private[parquet] def mergeMetastoreParquetSchema(
+      metastoreSchema: StructType,
+      parquetSchema: StructType): StructType = {
+    def schemaConflictMessage: String =
+      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
+         |${metastoreSchema.prettyJson}
+         |
+         |Parquet schema:
+         |${parquetSchema.prettyJson}
+       """.stripMargin
+
+    val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
+
+    assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
+
+    val ordinalMap = metastoreSchema.zipWithIndex.map {
+      case (field, index) => field.name.toLowerCase -> index
+    }.toMap
+
+    val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
+      ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
+
+    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
+      // Uses Parquet field names but retains Metastore data types.
+      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
+        mSchema.copy(name = pSchema.name)
+      case _ =>
+        throw new SparkException(schemaConflictMessage)
+    })
+  }
+
+  /**
+   * Returns the original schema from the Parquet file with any missing nullable fields from the
+   * Hive Metastore schema merged in.
+   *
+   * When constructing a DataFrame from a collection of structured data, the resulting object has
+   * a schema corresponding to the union of the fields present in each element of the collection.
+   * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
+   * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
+   * contain a particular nullable field in its schema despite that field being present in the
+   * table schema obtained from the Hive Metastore. This method returns a schema representing the
+   * Parquet file schema along with any additional nullable fields from the Metastore schema
+   * merged in.
+   */
+  private[parquet] def mergeMissingNullableFields(
+      metastoreSchema: StructType,
+      parquetSchema: StructType): StructType = {
+    val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
+    val missingFields = metastoreSchema
+      .map(_.name.toLowerCase)
+      .diff(parquetSchema.map(_.name.toLowerCase))
+      .map(fieldMap(_))
+      .filter(_.nullable)
+    StructType(parquetSchema ++ missingFields)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index ee099ab..e6324b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -59,7 +59,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         (a, _) => t.buildScan(a)) :: Nil
 
     // Scanning partitioned FSBasedRelation
-    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation))
+    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation))
         if t.partitionSpec.partitionColumns.nonEmpty =>
       val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
 
@@ -87,7 +87,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         selectedPartitions) :: Nil
 
     // Scanning non-partitioned FSBasedRelation
-    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) =>
+    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
       val inputPaths = t.paths.map(new Path(_)).flatMap { path =>
         val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration)
         val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
@@ -111,10 +111,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
 
     case i @ logical.InsertIntoTable(
-      l @ LogicalRelation(t: FSBasedRelation), part, query, overwrite, false) if part.isEmpty =>
+      l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) if part.isEmpty =>
       val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
       execution.ExecutedCommand(
-        InsertIntoFSBasedRelation(t, query, Array.empty[String], mode)) :: Nil
+        InsertIntoHadoopFsRelation(t, query, Array.empty[String], mode)) :: Nil
 
     case _ => Nil
   }
@@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       partitionColumns: StructType,
       partitions: Array[Partition]) = {
     val output = projections.map(_.toAttribute)
-    val relation = logicalRelation.relation.asInstanceOf[FSBasedRelation]
+    val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
 
     // Builds RDD[Row]s for each selected partition.
     val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index d30f7f6..d1f0cda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -35,6 +35,10 @@ private[sql] case class Partition(values: Row, path: String)
 private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
 
 private[sql] object PartitioningUtils {
+  // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
+  // depend on Hive.
+  private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
+
   private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
     require(columnNames.size == literals.size)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 7879328..a09bb08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -58,8 +58,8 @@ private[sql] case class InsertIntoDataSource(
   }
 }
 
-private[sql] case class InsertIntoFSBasedRelation(
-    @transient relation: FSBasedRelation,
+private[sql] case class InsertIntoHadoopFsRelation(
+    @transient relation: HadoopFsRelation,
     @transient query: LogicalPlan,
     partitionColumns: Array[String],
     mode: SaveMode)
@@ -102,7 +102,7 @@ private[sql] case class InsertIntoFSBasedRelation(
         insert(new DefaultWriterContainer(relation, job), df)
       } else {
         val writerContainer = new DynamicPartitionWriterContainer(
-          relation, job, partitionColumns, "__HIVE_DEFAULT_PARTITION__")
+          relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
         insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
       }
     }
@@ -234,7 +234,7 @@ private[sql] case class InsertIntoFSBasedRelation(
 }
 
 private[sql] abstract class BaseWriterContainer(
-    @transient val relation: FSBasedRelation,
+    @transient val relation: HadoopFsRelation,
     @transient job: Job)
   extends SparkHadoopMapReduceUtil
   with Logging
@@ -261,7 +261,7 @@ private[sql] abstract class BaseWriterContainer(
 
   protected val dataSchema = relation.dataSchema
 
-  protected val outputWriterClass: Class[_ <: OutputWriter] = relation.outputWriterClass
+  protected var outputWriterFactory: OutputWriterFactory = _
 
   private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
 
@@ -269,7 +269,7 @@ private[sql] abstract class BaseWriterContainer(
     setupIDs(0, 0, 0)
     setupConf()
     taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
-    relation.prepareForWrite(job)
+    outputWriterFactory = relation.prepareJobForWrite(job)
     outputFormatClass = job.getOutputFormatClass
     outputCommitter = newOutputCommitter(taskAttemptContext)
     outputCommitter.setupJob(jobContext)
@@ -346,16 +346,15 @@ private[sql] abstract class BaseWriterContainer(
 }
 
 private[sql] class DefaultWriterContainer(
-    @transient relation: FSBasedRelation,
+    @transient relation: HadoopFsRelation,
     @transient job: Job)
   extends BaseWriterContainer(relation, job) {
 
   @transient private var writer: OutputWriter = _
 
   override protected def initWriters(): Unit = {
-    writer = outputWriterClass.newInstance()
     taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
-    writer.init(getWorkPath, dataSchema, taskAttemptContext)
+    writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
   }
 
   override def outputWriterForRow(row: Row): OutputWriter = writer
@@ -372,7 +371,7 @@ private[sql] class DefaultWriterContainer(
 }
 
 private[sql] class DynamicPartitionWriterContainer(
-    @transient relation: FSBasedRelation,
+    @transient relation: HadoopFsRelation,
     @transient job: Job,
     partitionColumns: Array[String],
     defaultPartitionName: String)
@@ -398,12 +397,10 @@ private[sql] class DynamicPartitionWriterContainer(
 
     outputWriters.getOrElseUpdate(partitionPath, {
       val path = new Path(getWorkPath, partitionPath)
-      val writer = outputWriterClass.newInstance()
       taskAttemptContext.getConfiguration.set(
         "spark.sql.sources.output.path",
         new Path(outputPath, partitionPath).toString)
-      writer.init(path.toString, dataSchema, taskAttemptContext)
-      writer
+      outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
     })
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 595c5eb..37a569d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -226,7 +226,7 @@ private[sql] object ResolvedDataSource {
       case Some(schema: StructType) => clazz.newInstance() match {
         case dataSource: SchemaRelationProvider =>
           dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
-        case dataSource: FSBasedRelationProvider =>
+        case dataSource: HadoopFsRelationProvider =>
           val maybePartitionsSchema = if (partitionColumns.isEmpty) {
             None
           } else {
@@ -256,7 +256,7 @@ private[sql] object ResolvedDataSource {
       case None => clazz.newInstance() match {
         case dataSource: RelationProvider =>
           dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
-        case dataSource: FSBasedRelationProvider =>
+        case dataSource: HadoopFsRelationProvider =>
           val caseInsensitiveOptions = new CaseInsensitiveMap(options)
           val paths = {
             val patternPath = new Path(caseInsensitiveOptions("path"))
@@ -296,7 +296,7 @@ private[sql] object ResolvedDataSource {
     val relation = clazz.newInstance() match {
       case dataSource: CreatableRelationProvider =>
         dataSource.createRelation(sqlContext, mode, options, data)
-      case dataSource: FSBasedRelationProvider =>
+      case dataSource: HadoopFsRelationProvider =>
         // Don't glob path for the write path.  The contracts here are:
         //  1. Only one output path can be specified on the write path;
         //  2. Output path must be a legal HDFS style file system path;
@@ -315,7 +315,7 @@ private[sql] object ResolvedDataSource {
           Some(partitionColumnsSchema(data.schema, partitionColumns)),
           caseInsensitiveOptions)
         sqlContext.executePlan(
-          InsertIntoFSBasedRelation(
+          InsertIntoHadoopFsRelation(
             r,
             data.logicalPlan,
             partitionColumns.toArray,

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6f31530..274ab44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, _}
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.types.{StructField, StructType}
@@ -94,7 +94,7 @@ trait SchemaRelationProvider {
  * ::DeveloperApi::
  * Implemented by objects that produce relations for a specific kind of data source
  * with a given schema and partitioned columns.  When Spark SQL is given a DDL operation with a
- * USING clause specified (to specify the implemented [[FSBasedRelationProvider]]), a user defined
+ * USING clause specified (to specify the implemented [[HadoopFsRelationProvider]]), a user defined
  * schema, and an optional list of partition columns, this interface is used to pass in the
  * parameters specified by a user.
  *
@@ -105,15 +105,15 @@ trait SchemaRelationProvider {
  *
  * A new instance of this class with be instantiated each time a DDL call is made.
  *
- * The difference between a [[RelationProvider]] and a [[FSBasedRelationProvider]] is
+ * The difference between a [[RelationProvider]] and a [[HadoopFsRelationProvider]] is
  * that users need to provide a schema and a (possibly empty) list of partition columns when
  * using a SchemaRelationProvider. A relation provider can inherits both [[RelationProvider]],
- * and [[FSBasedRelationProvider]] if it can support schema inference, user-specified
+ * and [[HadoopFsRelationProvider]] if it can support schema inference, user-specified
  * schemas, and accessing partitioned relations.
  *
  * @since 1.4.0
  */
-trait FSBasedRelationProvider {
+trait HadoopFsRelationProvider {
   /**
    * Returns a new base relation with the given parameters, a user defined schema, and a list of
    * partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
@@ -124,7 +124,7 @@ trait FSBasedRelationProvider {
       paths: Array[String],
       schema: Option[StructType],
       partitionColumns: Option[StructType],
-      parameters: Map[String, String]): FSBasedRelation
+      parameters: Map[String, String]): HadoopFsRelation
 }
 
 /**
@@ -280,33 +280,42 @@ trait CatalystScan {
 
 /**
  * ::Experimental::
- * [[OutputWriter]] is used together with [[FSBasedRelation]] for persisting rows to the
- * underlying file system.  Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
- * An [[OutputWriter]] instance is created and initialized when a new output file is opened on
- * executor side.  This instance is used to persist rows to this single output file.
+ * A factory that produces [[OutputWriter]]s.  A new [[OutputWriterFactory]] is created on driver
+ * side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized
+ * to executor side to create actual [[OutputWriter]]s on the fly.
  *
  * @since 1.4.0
  */
 @Experimental
-abstract class OutputWriter {
+abstract class OutputWriterFactory extends Serializable {
   /**
-   * Initializes this [[OutputWriter]] before any rows are persisted.
+   * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side
+   * to instantiate new [[OutputWriter]]s.
    *
    * @param path Path of the file to which this [[OutputWriter]] is supposed to write.  Note that
    *        this may not point to the final output file.  For example, `FileOutputFormat` writes to
    *        temporary directories and then merge written files back to the final destination.  In
    *        this case, `path` points to a temporary output file under the temporary directory.
    * @param dataSchema Schema of the rows to be written. Partition columns are not included in the
-   *        schema if the corresponding relation is partitioned.
+   *        schema if the relation being written is partitioned.
    * @param context The Hadoop MapReduce task context.
    *
    * @since 1.4.0
    */
-  def init(
-      path: String,
-      dataSchema: StructType,
-      context: TaskAttemptContext): Unit = ()
+  def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter
+}
 
+/**
+ * ::Experimental::
+ * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the
+ * underlying file system.  Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
+ * An [[OutputWriter]] instance is created and initialized when a new output file is opened on
+ * executor side.  This instance is used to persist rows to this single output file.
+ *
+ * @since 1.4.0
+ */
+@Experimental
+abstract class OutputWriter {
   /**
    * Persists a single row.  Invoked on the executor side.  When writing to dynamically partitioned
    * tables, dynamic partition columns are not included in rows to be written.
@@ -333,74 +342,71 @@ abstract class OutputWriter {
  * filter using selected predicates before producing an RDD containing all matching tuples as
  * [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file
  * systems, it's able to discover partitioning information from the paths of input directories, and
- * perform partition pruning before start reading the data. Subclasses of [[FSBasedRelation()]] must
- * override one of the three `buildScan` methods to implement the read path.
+ * perform partition pruning before start reading the data. Subclasses of [[HadoopFsRelation()]]
+ * must override one of the three `buildScan` methods to implement the read path.
  *
  * For the write path, it provides the ability to write to both non-partitioned and partitioned
  * tables.  Directory layout of the partitioned tables is compatible with Hive.
  *
  * @constructor This constructor is for internal uses only. The [[PartitionSpec]] argument is for
  *              implementing metastore table conversion.
- * @param paths Base paths of this relation.  For partitioned relations, it should be the root
- *        directories of all partition directories.
- * @param maybePartitionSpec An [[FSBasedRelation]] can be created with an optional
+ *
+ * @param maybePartitionSpec An [[HadoopFsRelation]] can be created with an optional
  *        [[PartitionSpec]], so that partition discovery can be skipped.
  *
  * @since 1.4.0
  */
 @Experimental
-abstract class FSBasedRelation private[sql](
-    val paths: Array[String],
-    maybePartitionSpec: Option[PartitionSpec])
+abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec])
   extends BaseRelation {
 
+  def this() = this(None)
+
+  private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+
+  private val codegenEnabled = sqlContext.conf.codegenEnabled
+
+  private var _partitionSpec: PartitionSpec = _
+
+  final private[sql] def partitionSpec: PartitionSpec = {
+    if (_partitionSpec == null) {
+      _partitionSpec = maybePartitionSpec
+        .map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
+        .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
+        .getOrElse {
+        if (sqlContext.conf.partitionDiscoveryEnabled()) {
+          discoverPartitions()
+        } else {
+          PartitionSpec(StructType(Nil), Array.empty[Partition])
+        }
+      }
+    }
+    _partitionSpec
+  }
+
   /**
-   * Constructs an [[FSBasedRelation]].
-   *
-   * @param paths Base paths of this relation.  For partitioned relations, it should be either root
-   *        directories of all partition directories.
-   * @param partitionColumns Partition columns of this relation.
+   * Base paths of this relation.  For partitioned relations, it should be either root directories
+   * of all partition directories.
    *
    * @since 1.4.0
    */
-  def this(paths: Array[String], partitionColumns: StructType) =
-    this(paths, {
-      if (partitionColumns.isEmpty) None
-      else Some(PartitionSpec(partitionColumns, Array.empty[Partition]))
-    })
+  def paths: Array[String]
 
   /**
-   * Constructs an [[FSBasedRelation]].
-   *
-   * @param paths Base paths of this relation.  For partitioned relations, it should be root
-   *        directories of all partition directories.
+   * Partition columns.  Can be either defined by [[userDefinedPartitionColumns]] or automatically
+   * discovered.  Note that they should always be nullable.
    *
    * @since 1.4.0
    */
-  def this(paths: Array[String]) = this(paths, None)
-
-  private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
-
-  private val codegenEnabled = sqlContext.conf.codegenEnabled
-
-  private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec =>
-    spec.copy(partitionColumns = spec.partitionColumns.asNullable)
-  }.getOrElse {
-    if (sqlContext.conf.partitionDiscoveryEnabled()) {
-      discoverPartitions()
-    } else {
-      PartitionSpec(StructType(Nil), Array.empty[Partition])
-    }
-  }
-
-  private[sql] def partitionSpec: PartitionSpec = _partitionSpec
+  final def partitionColumns: StructType =
+    userDefinedPartitionColumns.getOrElse(partitionSpec.partitionColumns)
 
   /**
-   * Partition columns. Note that they are always nullable.
+   * Optional user defined partition columns.
    *
    * @since 1.4.0
    */
-  def partitionColumns: StructType = partitionSpec.partitionColumns
+  def userDefinedPartitionColumns: Option[StructType] = None
 
   private[sql] def refresh(): Unit = {
     if (sqlContext.conf.partitionDiscoveryEnabled()) {
@@ -419,7 +425,7 @@ abstract class FSBasedRelation private[sql](
     }.map(_.getPath)
 
     if (leafDirs.nonEmpty) {
-      PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__")
+      PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
     } else {
       PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
     }
@@ -458,7 +464,7 @@ abstract class FSBasedRelation private[sql](
    * @since 1.4.0
    */
   def buildScan(inputPaths: Array[String]): RDD[Row] = {
-    throw new RuntimeException(
+    throw new UnsupportedOperationException(
       "At least one buildScan() method should be overridden to read the relation.")
   }
 
@@ -520,8 +526,8 @@ abstract class FSBasedRelation private[sql](
   }
 
   /**
-   * Client side preparation for data writing can be put here.  For example, user defined output
-   * committer can be configured here.
+   * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation can
+   * be put here.  For example, user defined output committer can be configured here.
    *
    * Note that the only side effect expected here is mutating `job` via its setters.  Especially,
    * Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states
@@ -529,13 +535,5 @@ abstract class FSBasedRelation private[sql](
    *
    * @since 1.4.0
    */
-  def prepareForWrite(job: Job): Unit = ()
-
-  /**
-   * This method is responsible for producing a new [[OutputWriter]] for each newly opened output
-   * file on the executor side.
-   *
-   * @since 1.4.0
-   */
-  def outputWriterClass: Class[_ <: OutputWriter]
+  def prepareJobForWrite(job: Job): OutputWriterFactory
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index aad1d24..1eacdde 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -102,7 +102,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
         }
 
       case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) => // OK
-      case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) => // OK
+      case logical.InsertIntoTable(LogicalRelation(_: HadoopFsRelation), _, _, _, _) => // OK
       case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
         // The relation in l is not an InsertableRelation.
         failAnalysis(s"$l does not allow insertion.")

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 3bbc5b0..5ad4395 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
         }.flatten.reduceOption(_ && _)
 
         val forParquetDataSource = query.queryExecution.optimizedPlan.collect {
-          case PhysicalOperation(_, filters, LogicalRelation(_: FSBasedParquetRelation)) => filters
+          case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters
         }.flatten.reduceOption(_ && _)
 
         forParquetTableScan.orElse(forParquetDataSource)

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index fc90e3e..c964b6d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -204,7 +204,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
         StructField("lowerCase", StringType),
         StructField("UPPERCase", DoubleType, nullable = false)))) {
 
-      FSBasedParquetRelation.mergeMetastoreParquetSchema(
+      ParquetRelation2.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("lowercase", StringType),
           StructField("uppercase", DoubleType, nullable = false))),
@@ -219,7 +219,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
       StructType(Seq(
         StructField("UPPERCase", DoubleType, nullable = false)))) {
 
-      FSBasedParquetRelation.mergeMetastoreParquetSchema(
+      ParquetRelation2.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("uppercase", DoubleType, nullable = false))),
 
@@ -230,7 +230,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
 
     // Metastore schema contains additional non-nullable fields.
     assert(intercept[Throwable] {
-      FSBasedParquetRelation.mergeMetastoreParquetSchema(
+      ParquetRelation2.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("uppercase", DoubleType, nullable = false),
           StructField("lowerCase", BinaryType, nullable = false))),
@@ -241,7 +241,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
 
     // Conflicting non-nullable field names
     intercept[Throwable] {
-      FSBasedParquetRelation.mergeMetastoreParquetSchema(
+      ParquetRelation2.mergeMetastoreParquetSchema(
         StructType(Seq(StructField("lower", StringType, nullable = false))),
         StructType(Seq(StructField("lowerCase", BinaryType))))
     }
@@ -255,7 +255,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
         StructField("firstField", StringType, nullable = true),
         StructField("secondField", StringType, nullable = true),
         StructField("thirdfield", StringType, nullable = true)))) {
-      FSBasedParquetRelation.mergeMetastoreParquetSchema(
+      ParquetRelation2.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("firstfield", StringType, nullable = true),
           StructField("secondfield", StringType, nullable = true),
@@ -268,7 +268,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
     // Merge should fail if the Metastore contains any additional fields that are not
     // nullable.
     assert(intercept[Throwable] {
-      FSBasedParquetRelation.mergeMetastoreParquetSchema(
+      ParquetRelation2.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("firstfield", StringType, nullable = true),
           StructField("secondfield", StringType, nullable = true),

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b0e82c8..2aa80b4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
@@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
     // serialize the Metastore schema to JSON and pass it as a data source option because of the
     // evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
     val parquetOptions = Map(
-      FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
-      FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
+      ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
+      ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
     val tableIdentifier =
       QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
 
@@ -238,7 +238,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
         partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
       cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => None // Cache miss
-        case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) =>
+        case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
           // If we have the same paths, same schema, and same partition spec,
           // we will use the cached Parquet Relation.
           val useCached =
@@ -281,7 +281,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
       val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
       val parquetRelation = cached.getOrElse {
         val created = LogicalRelation(
-          new FSBasedParquetRelation(
+          new ParquetRelation2(
             paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
         cachedDataSourceTables.put(tableIdentifier, created)
         created
@@ -294,7 +294,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
       val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
       val parquetRelation = cached.getOrElse {
         val created = LogicalRelation(
-          new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive))
+          new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive))
         cachedDataSourceTables.put(tableIdentifier, created)
         created
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 8e405e0..6609763 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -194,7 +194,7 @@ case class CreateMetastoreDataSourceAsSelect(
             sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
           val createdRelation = LogicalRelation(resolved.relation)
           EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
-            case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) =>
+            case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) =>
               if (l.relation != createdRelation.relation) {
                 val errorDescription =
                   s"Cannot append to table $tableName because the resolved relation does not " +

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf5bba3/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index da5d203..1bf1c1b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -579,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
       )
 
       table("test_parquet_ctas").queryExecution.optimizedPlan match {
-        case LogicalRelation(p: FSBasedParquetRelation) => // OK
+        case LogicalRelation(p: ParquetRelation2) => // OK
         case _ =>
           fail(
             "test_parquet_ctas should be converted to " +
-            s"${classOf[FSBasedParquetRelation].getCanonicalName}")
+            s"${classOf[ParquetRelation2].getCanonicalName}")
       }
 
       // Clenup and reset confs.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org