You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/02/06 00:30:01 UTC

[2/2] spark git commit: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-3575] [SQL] Parquet data source improvements

[SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-3575] [SQL] Parquet data source improvements

This PR adds three major improvements to Parquet data source:

1.  Partition discovery

    While reading Parquet files resides in Hive style partition directories, `ParquetRelation2` automatically discovers partitioning information and infers partition column types.

    This is also a partial work for [SPARK-5182] [1], which aims to provide first class partitioning support for the data source API.  Related code in this PR can be easily extracted to the data source API level in future versions.

1.  Schema merging

    When enabled, Parquet data source collects schema information from all Parquet part-files and tries to merge them.  Exceptions are thrown when incompatible schemas are detected.  This feature is controlled by data source option `parquet.mergeSchema`, and is enabled by default.

1.  Metastore Parquet table conversion moved to analysis phase

    This greatly simplifies the conversion logic.  `ParquetConversion` strategy can be removed once the old Parquet implementation is removed in the future.

This version of Parquet data source aims to entirely replace the old Parquet implementation.  However, the old version hasn't been removed yet.  Users can fall back to the old version by turning off SQL configuration `spark.sql.parquet.useDataSourceApi`.

Other JIRA tickets fixed as side effects in this PR:

- [SPARK-5509] [3]: `EqualTo` now uses a proper `Ordering` to compare binary types.

- [SPARK-3575] [4]: Metastore schema is now preserved and passed to `ParquetRelation2` via data source option `parquet.metastoreSchema`.

TODO:

- [ ] More test cases for partition discovery
- [x] Fix write path after data source write support (#4294) is merged

      It turned out to be non-trivial to fall back to old Parquet implementation on the write path when Parquet data source is enabled.  Since we're planning to include data source write support in 1.3.0, I simply ignored two test cases involving Parquet insertion for now.

- [ ] Fix outdated comments and documentations

PS: This PR looks big, but more than a half of the changed lines in this PR are trivial changes to test cases. To test Parquet with and without the new data source, almost all Parquet test cases are moved into wrapper driver functions. This introduces hundreds of lines of changes.

[1]: https://issues.apache.org/jira/browse/SPARK-5182
[2]: https://issues.apache.org/jira/browse/SPARK-5528
[3]: https://issues.apache.org/jira/browse/SPARK-5509
[4]: https://issues.apache.org/jira/browse/SPARK-3575

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4308)
<!-- Reviewable:end -->

Author: Cheng Lian <li...@databricks.com>

Closes #4308 from liancheng/parquet-partition-discovery and squashes the following commits:

b6946e6 [Cheng Lian] Fixes MiMA issues, addresses comments
8232e17 [Cheng Lian] Write support for Parquet data source
a49bd28 [Cheng Lian] Fixes spelling typo in trait name "CreateableRelationProvider"
808380f [Cheng Lian] Fixes issues introduced while rebasing
50dd8d1 [Cheng Lian] Addresses @rxin's comment, fixes UDT schema merging
adf2aae [Cheng Lian] Fixes compilation error introduced while rebasing
4e0175f [Cheng Lian] Fixes Python Parquet API, we need Py4J array to call varargs method
0d8ec1d [Cheng Lian] Adds more test cases
b35c8c6 [Cheng Lian] Fixes some typos and outdated comments
dd704fd [Cheng Lian] Fixes Python Parquet API
596c312 [Cheng Lian] Uses switch to control whether use Parquet data source or not
7d0f7a2 [Cheng Lian] Fixes Metastore Parquet table conversion
a1896c7 [Cheng Lian] Fixes all existing Parquet test suites except for ParquetMetastoreSuite
5654c9d [Cheng Lian] Draft version of Parquet partition discovery and schema merging


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9ed5117
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9ed5117
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9ed5117

Branch: refs/heads/master
Commit: a9ed51178c89d83aae1ad420fb3f4a7f4d1812ec
Parents: c19152c
Author: Cheng Lian <li...@databricks.com>
Authored: Thu Feb 5 15:29:56 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Feb 5 15:29:56 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  17 +-
 python/pyspark/sql.py                           |   9 +-
 .../sql/catalyst/expressions/predicates.scala   |   7 +-
 .../org/apache/spark/sql/types/dataTypes.scala  |  68 +-
 .../org/apache/spark/sql/DataFrameImpl.scala    |   6 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    |   5 +
 .../scala/org/apache/spark/sql/SQLContext.scala |  20 +-
 .../spark/sql/execution/SparkStrategies.scala   |   9 +-
 .../apache/spark/sql/json/JSONRelation.scala    |   2 +-
 .../spark/sql/parquet/ParquetTableSupport.scala |   9 +-
 .../apache/spark/sql/parquet/ParquetTypes.scala |  17 +-
 .../apache/spark/sql/parquet/newParquet.scala   | 753 +++++++++++++++----
 .../spark/sql/sources/DataSourceStrategy.scala  |   8 +-
 .../org/apache/spark/sql/sources/ddl.scala      |  20 +-
 .../apache/spark/sql/sources/interfaces.scala   |   2 +-
 .../spark/sql/parquet/ParquetFilterSuite.scala  | 375 ++++-----
 .../spark/sql/parquet/ParquetIOSuite.scala      | 353 ++++-----
 .../ParquetPartitionDiscoverySuite.scala        | 126 ++++
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 127 ++--
 .../spark/sql/parquet/ParquetSchemaSuite.scala  |  37 +
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  20 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |   7 +-
 .../spark/sql/parquet/HiveParquetSuite.scala    |  78 +-
 .../spark/sql/parquet/parquetSuites.scala       | 202 ++---
 24 files changed, 1541 insertions(+), 736 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index d688542..03238e9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -21,7 +21,7 @@ import java.lang.reflect.Method
 import java.security.PrivilegedExceptionAction
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.fs.FileSystem.Statistics
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
@@ -191,6 +191,21 @@ class SparkHadoopUtil extends Logging {
     val method = context.getClass.getMethod("getConfiguration")
     method.invoke(context).asInstanceOf[Configuration]
   }
+
+  /**
+   * Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
+   * given path points to a file, return a single-element collection containing [[FileStatus]] of
+   * that file.
+   */
+  def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
+    def recurse(path: Path) = {
+      val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
+      leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
+    }
+
+    val baseStatus = fs.getFileStatus(basePath)
+    if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
+  }
 }
 
 object SparkHadoopUtil {

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/python/pyspark/sql.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 3ac8ea5..e55f285 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -1471,7 +1471,7 @@ class SQLContext(object):
         else:
             raise ValueError("Can only register DataFrame as table")
 
-    def parquetFile(self, path):
+    def parquetFile(self, *paths):
         """Loads a Parquet file, returning the result as a L{DataFrame}.
 
         >>> import tempfile, shutil
@@ -1483,7 +1483,12 @@ class SQLContext(object):
         >>> sorted(df.collect()) == sorted(df2.collect())
         True
         """
-        jdf = self._ssql_ctx.parquetFile(path)
+        gateway = self._sc._gateway
+        jpath = paths[0]
+        jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths) - 1)
+        for i in range(1, len(paths)):
+            jpaths[i] = paths[i]
+        jdf = self._ssql_ctx.parquetFile(jpath, jpaths)
         return DataFrame(jdf, self)
 
     def jsonFile(self, path, schema=None, samplingRatio=1.0):

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index c84cc95..365b168 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.analysis.UnresolvedException
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.types.BooleanType
+import org.apache.spark.sql.types.{BinaryType, BooleanType}
 
 object InterpretedPredicate {
   def apply(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
@@ -175,7 +175,10 @@ case class EqualTo(left: Expression, right: Expression) extends BinaryComparison
       null
     } else {
       val r = right.eval(input)
-      if (r == null) null else l == r
+      if (r == null) null
+      else if (left.dataType != BinaryType) l == r
+      else BinaryType.ordering.compare(
+        l.asInstanceOf[Array[Byte]], r.asInstanceOf[Array[Byte]]) == 0
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
index be362be..91efe32 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.types
 
 import java.sql.Timestamp
 
+import scala.collection.mutable.ArrayBuffer
 import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag}
@@ -29,6 +30,7 @@ import org.json4s.JsonAST.JValue
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
+import org.apache.spark.SparkException
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.ScalaReflectionLock
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
@@ -159,7 +161,6 @@ object DataType {
       case failure: NoSuccess =>
         throw new IllegalArgumentException(s"Unsupported dataType: $asString, $failure")
     }
-
   }
 
   protected[types] def buildFormattedString(
@@ -754,6 +755,57 @@ object StructType {
   def apply(fields: java.util.List[StructField]): StructType = {
     StructType(fields.toArray.asInstanceOf[Array[StructField]])
   }
+
+  private[sql] def merge(left: DataType, right: DataType): DataType =
+    (left, right) match {
+      case (ArrayType(leftElementType, leftContainsNull),
+            ArrayType(rightElementType, rightContainsNull)) =>
+        ArrayType(
+          merge(leftElementType, rightElementType),
+          leftContainsNull || rightContainsNull)
+
+      case (MapType(leftKeyType, leftValueType, leftContainsNull),
+            MapType(rightKeyType, rightValueType, rightContainsNull)) =>
+        MapType(
+          merge(leftKeyType, rightKeyType),
+          merge(leftValueType, rightValueType),
+          leftContainsNull || rightContainsNull)
+
+      case (StructType(leftFields), StructType(rightFields)) =>
+        val newFields = ArrayBuffer.empty[StructField]
+
+        leftFields.foreach {
+          case leftField @ StructField(leftName, leftType, leftNullable, _) =>
+            rightFields
+              .find(_.name == leftName)
+              .map { case rightField @ StructField(_, rightType, rightNullable, _) =>
+                leftField.copy(
+                  dataType = merge(leftType, rightType),
+                  nullable = leftNullable || rightNullable)
+              }
+              .orElse(Some(leftField))
+              .foreach(newFields += _)
+        }
+
+        rightFields
+          .filterNot(f => leftFields.map(_.name).contains(f.name))
+          .foreach(newFields += _)
+
+        StructType(newFields)
+
+      case (DecimalType.Fixed(leftPrecision, leftScale),
+            DecimalType.Fixed(rightPrecision, rightScale)) =>
+        DecimalType(leftPrecision.max(rightPrecision), leftScale.max(rightScale))
+
+      case (leftUdt: UserDefinedType[_], rightUdt: UserDefinedType[_])
+        if leftUdt.userClass == rightUdt.userClass => leftUdt
+
+      case (leftType, rightType) if leftType == rightType =>
+        leftType
+
+      case _ =>
+        throw new SparkException(s"Failed to merge incompatible data types $left and $right")
+    }
 }
 
 
@@ -890,6 +942,20 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
     val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
     s"struct<${fieldTypes.mkString(",")}>"
   }
+
+  /**
+   * Merges with another schema (`StructType`).  For a struct field A from `this` and a struct field
+   * B from `that`,
+   *
+   * 1. If A and B have the same name and data type, they are merged to a field C with the same name
+   *    and data type.  C is nullable if and only if either A or B is nullable.
+   * 2. If A doesn't exist in `that`, it's included in the result schema.
+   * 3. If B doesn't exist in `this`, it's also included in the result schema.
+   * 4. Otherwise, `this` and `that` are considered as conflicting schemas and an exception would be
+   *    thrown.
+   */
+  private[sql] def merge(that: StructType): StructType =
+    StructType.merge(this, that).asInstanceOf[StructType]
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index d6df927..58d1175 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -295,7 +295,11 @@ private[sql] class DataFrameImpl protected[sql](
   }
 
   override def saveAsParquetFile(path: String): Unit = {
-    sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
+    if (sqlContext.conf.parquetUseDataSourceApi) {
+      save("org.apache.spark.sql.parquet", "path" -> path)
+    } else {
+      sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
+    }
   }
 
   override def saveAsTable(tableName: String): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 7fe1794..5ef3bb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -37,6 +37,7 @@ private[spark] object SQLConf {
   val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
   val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
   val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
+  val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"
 
   val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
   val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
@@ -105,6 +106,10 @@ private[sql] class SQLConf extends Serializable {
   private[spark] def parquetFilterPushDown =
     getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
 
+  /** When true uses Parquet implementation based on data source API */
+  private[spark] def parquetUseDataSourceApi =
+    getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean
+
   /** When true the planner will use the external sort, which may spill to disk. */
   private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 01620aa..706ef6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -20,14 +20,13 @@ package org.apache.spark.sql
 import java.beans.Introspector
 import java.util.Properties
 
-import scala.collection.immutable
 import scala.collection.JavaConversions._
+import scala.collection.immutable
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.TypeTag
 
-import org.apache.spark.{SparkContext, Partition}
 import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
-import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis._
@@ -36,11 +35,12 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.json._
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.json._
+import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation, _}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
+import org.apache.spark.{Partition, SparkContext}
 
 /**
  * :: AlphaComponent ::
@@ -303,8 +303,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *
    * @group userf
    */
-  def parquetFile(path: String): DataFrame =
-    DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
+  @scala.annotation.varargs
+  def parquetFile(path: String, paths: String*): DataFrame =
+    if (conf.parquetUseDataSourceApi) {
+      baseRelationToDataFrame(parquet.ParquetRelation2(path +: paths, Map.empty)(this))
+    } else {
+      DataFrame(this, parquet.ParquetRelation(
+        paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
+    }
 
   /**
    * Loads a JSON file (one object per line), returning the result as a [[DataFrame]].

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f06f5fd..81bcf5a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,18 +17,17 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.{SQLContext, Strategy, execution}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
+import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
 import org.apache.spark.sql.parquet._
+import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.sources.{DescribeCommand => LogicalDescribeCommand}
-import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.{SQLContext, Strategy, execution}
 
 private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   self: SQLContext#SparkPlanner =>

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 8372dec..f27585d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType
 
 
 private[sql] class DefaultSource
-  extends RelationProvider with SchemaRelationProvider with CreateableRelationProvider {
+  extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
 
   /** Returns a new base relation with the parameters. */
   override def createRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 14c81ae..19bfba3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -159,7 +159,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
     val attributesSize = attributes.size
     if (attributesSize > record.size) {
       throw new IndexOutOfBoundsException(
-        s"Trying to write more fields than contained in row (${attributesSize}>${record.size})")
+        s"Trying to write more fields than contained in row ($attributesSize > ${record.size})")
     }
 
     var index = 0
@@ -325,7 +325,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
     val attributesSize = attributes.size
     if (attributesSize > record.size) {
       throw new IndexOutOfBoundsException(
-        s"Trying to write more fields than contained in row (${attributesSize}>${record.size})")
+        s"Trying to write more fields than contained in row ($attributesSize > ${record.size})")
     }
 
     var index = 0
@@ -348,10 +348,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
       index: Int): Unit = {
     ctype match {
       case StringType => writer.addBinary(
-        Binary.fromByteArray(
-          record(index).asInstanceOf[String].getBytes("utf-8")
-        )
-      )
+        Binary.fromByteArray(record(index).asInstanceOf[String].getBytes("utf-8")))
       case BinaryType => writer.addBinary(
         Binary.fromByteArray(record(index).asInstanceOf[Array[Byte]]))
       case IntegerType => writer.addInteger(record.getInt(index))

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index b646109..5209581 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -19,24 +19,23 @@ package org.apache.spark.sql.parquet
 
 import java.io.IOException
 
+import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.mapreduce.Job
-
 import parquet.format.converter.ParquetMetadataConverter
-import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter}
-import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData}
+import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
 import parquet.hadoop.util.ContextUtil
-import parquet.schema.{Type => ParquetType, Types => ParquetTypes, PrimitiveType => ParquetPrimitiveType, MessageType}
-import parquet.schema.{GroupType => ParquetGroupType, OriginalType => ParquetOriginalType, ConversionPatterns, DecimalMetadata}
+import parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
 import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
 import parquet.schema.Type.Repetition
+import parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => ParquetTypes}
 
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.types._
+import org.apache.spark.{Logging, SparkException}
 
 // Implicits
 import scala.collection.JavaConversions._
@@ -285,7 +284,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
       ctype: DataType,
       name: String,
       nullable: Boolean = true,
-      inArray: Boolean = false, 
+      inArray: Boolean = false,
       toThriftSchemaNames: Boolean = false): ParquetType = {
     val repetition =
       if (inArray) {
@@ -340,7 +339,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
         }
         case StructType(structFields) => {
           val fields = structFields.map {
-            field => fromDataType(field.dataType, field.name, field.nullable, 
+            field => fromDataType(field.dataType, field.name, field.nullable,
                                   inArray = false, toThriftSchemaNames)
           }
           new ParquetGroupType(repetition, name, fields.toSeq)

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 179c0d6..49d4633 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -16,25 +16,38 @@
  */
 package org.apache.spark.sql.parquet
 
-import java.util.{List => JList}
+import java.io.IOException
+import java.lang.{Double => JDouble, Float => JFloat, Long => JLong}
+import java.math.{BigDecimal => JBigDecimal}
+import java.text.SimpleDateFormat
+import java.util.{List => JList, Date}
 
 import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
 
-import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext}
 import parquet.filter2.predicate.FilterApi
-import parquet.hadoop.ParquetInputFormat
+import parquet.format.converter.ParquetMetadataConverter
+import parquet.hadoop.{ParquetInputFormat, _}
 import parquet.hadoop.util.ContextUtil
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.{NewHadoopPartition, RDD}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.parquet.ParquetTypesConverter._
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
-import org.apache.spark.sql.{Row, SQLConf, SQLContext}
-import org.apache.spark.{Logging, Partition => SparkPartition}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType, _}
+import org.apache.spark.sql.types.StructType._
+import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext}
+import org.apache.spark.{Partition => SparkPartition, TaskContext, SerializableWritable, Logging, SparkException}
 
 
 /**
@@ -43,19 +56,49 @@ import org.apache.spark.{Logging, Partition => SparkPartition}
  * required is `path`, which should be the location of a collection of, optionally partitioned,
  * parquet files.
  */
-class DefaultSource extends RelationProvider {
+class DefaultSource
+    extends RelationProvider
+    with SchemaRelationProvider
+    with CreatableRelationProvider {
+  private def checkPath(parameters: Map[String, String]): String = {
+    parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))
+  }
+
   /** Returns a new base relation with the given parameters. */
   override def createRelation(
       sqlContext: SQLContext,
       parameters: Map[String, String]): BaseRelation = {
-    val path =
-      parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))
+    ParquetRelation2(Seq(checkPath(parameters)), parameters, None)(sqlContext)
+  }
 
-    ParquetRelation2(path)(sqlContext)
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      schema: StructType): BaseRelation = {
+    ParquetRelation2(Seq(checkPath(parameters)), parameters, Some(schema))(sqlContext)
+  }
+
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      data: DataFrame): BaseRelation = {
+    val path = checkPath(parameters)
+    ParquetRelation.createEmpty(
+      path,
+      data.schema.toAttributes,
+      false,
+      sqlContext.sparkContext.hadoopConfiguration,
+      sqlContext)
+
+    val relation = createRelation(sqlContext, parameters, data.schema)
+    relation.asInstanceOf[ParquetRelation2].insert(data, true)
+    relation
   }
 }
 
-private[parquet] case class Partition(partitionValues: Map[String, Any], files: Seq[FileStatus])
+private[parquet] case class Partition(values: Row, path: String)
+
+private[parquet] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
 
 /**
  * An alternative to [[ParquetRelation]] that plugs in using the data sources API.  This class is
@@ -81,117 +124,196 @@ private[parquet] case class Partition(partitionValues: Map[String, Any], files:
  * discovery.
  */
 @DeveloperApi
-case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
-  extends CatalystScan with Logging {
+case class ParquetRelation2
+    (paths: Seq[String], parameters: Map[String, String], maybeSchema: Option[StructType] = None)
+    (@transient val sqlContext: SQLContext)
+  extends CatalystScan
+  with InsertableRelation
+  with SparkHadoopMapReduceUtil
+  with Logging {
+
+  // Should we merge schemas from all Parquet part-files?
+  private val shouldMergeSchemas =
+    parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
+
+  // Optional Metastore schema, used when converting Hive Metastore Parquet table
+  private val maybeMetastoreSchema =
+    parameters
+      .get(ParquetRelation2.METASTORE_SCHEMA)
+      .map(s => DataType.fromJson(s).asInstanceOf[StructType])
+
+  // Hive uses this as part of the default partition name when the partition column value is null
+  // or empty string
+  private val defaultPartitionName = parameters.getOrElse(
+    ParquetRelation2.DEFAULT_PARTITION_NAME, "__HIVE_DEFAULT_PARTITION__")
+
+  override def equals(other: Any) = other match {
+    case relation: ParquetRelation2 =>
+      paths.toSet == relation.paths.toSet &&
+        maybeMetastoreSchema == relation.maybeMetastoreSchema &&
+        (shouldMergeSchemas == relation.shouldMergeSchemas || schema == relation.schema)
+  }
 
-  def sparkContext = sqlContext.sparkContext
+  private[sql] def sparkContext = sqlContext.sparkContext
 
-  // Minor Hack: scala doesnt seem to respect @transient for vals declared via extraction
-  @transient
-  private var partitionKeys: Seq[String] = _
-  @transient
-  private var partitions: Seq[Partition] = _
-  discoverPartitions()
+  @transient private val fs = FileSystem.get(sparkContext.hadoopConfiguration)
 
-  // TODO: Only finds the first partition, assumes the key is of type Integer...
-  private def discoverPartitions() = {
-    val fs = FileSystem.get(new java.net.URI(path), sparkContext.hadoopConfiguration)
-    val partValue = "([^=]+)=([^=]+)".r
+  private class MetadataCache {
+    private var metadataStatuses: Array[FileStatus] = _
+    private var commonMetadataStatuses: Array[FileStatus] = _
+    private var footers: Map[FileStatus, Footer] = _
+    private var parquetSchema: StructType = _
 
-    val childrenOfPath = fs.listStatus(new Path(path)).filterNot(_.getPath.getName.startsWith("_"))
-    val childDirs = childrenOfPath.filter(s => s.isDir)
+    var dataStatuses: Array[FileStatus] = _
+    var partitionSpec: PartitionSpec = _
+    var schema: StructType = _
+    var dataSchemaIncludesPartitionKeys: Boolean = _
 
-    if (childDirs.size > 0) {
-      val partitionPairs = childDirs.map(_.getPath.getName).map {
-        case partValue(key, value) => (key, value)
+    def refresh(): Unit = {
+      val baseStatuses = {
+        val statuses = paths.distinct.map(p => fs.getFileStatus(fs.makeQualified(new Path(p))))
+        // Support either reading a collection of raw Parquet part-files, or a collection of folders
+        // containing Parquet files (e.g. partitioned Parquet table).
+        assert(statuses.forall(!_.isDir) || statuses.forall(_.isDir))
+        statuses.toArray
       }
 
-      val foundKeys = partitionPairs.map(_._1).distinct
-      if (foundKeys.size > 1) {
-        sys.error(s"Too many distinct partition keys: $foundKeys")
+      val leaves = baseStatuses.flatMap { f =>
+        val statuses = SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
+          isSummaryFile(f.getPath) ||
+            !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+        }
+        assert(statuses.nonEmpty, s"${f.getPath} is an empty folder.")
+        statuses
       }
 
-      // Do a parallel lookup of partition metadata.
-      val partitionFiles =
-        childDirs.par.map { d =>
-          fs.listStatus(d.getPath)
-            // TODO: Is there a standard hadoop function for this?
-            .filterNot(_.getPath.getName.startsWith("_"))
-            .filterNot(_.getPath.getName.startsWith("."))
-        }.seq
-
-      partitionKeys = foundKeys.toSeq
-      partitions = partitionFiles.zip(partitionPairs).map { case (files, (key, value)) =>
-        Partition(Map(key -> value.toInt), files)
-      }.toSeq
-    } else {
-      partitionKeys = Nil
-      partitions = Partition(Map.empty, childrenOfPath) :: Nil
-    }
-  }
+      dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
+      metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
+      commonMetadataStatuses =
+        leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
+
+      footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
+        val parquetMetadata = ParquetFileReader.readFooter(
+          sparkContext.hadoopConfiguration, f, ParquetMetadataConverter.NO_FILTER)
+        f -> new Footer(f.getPath, parquetMetadata)
+      }.seq.toMap
+
+      partitionSpec = {
+        val partitionDirs = dataStatuses
+          .filterNot(baseStatuses.contains)
+          .map(_.getPath.getParent)
+          .distinct
+
+        if (partitionDirs.nonEmpty) {
+          ParquetRelation2.parsePartitions(partitionDirs, defaultPartitionName)
+        } else {
+          // No partition directories found, makes an empty specification
+          PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
+        }
+      }
 
-  override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum
+      parquetSchema = maybeSchema.getOrElse(readSchema())
 
-  val dataSchema = StructType.fromAttributes( // TODO: Parquet code should not deal with attributes.
-    ParquetTypesConverter.readSchemaFromFile(
-      partitions.head.files.head.getPath,
-      Some(sparkContext.hadoopConfiguration),
-      sqlContext.conf.isParquetBinaryAsString,
-      sqlContext.conf.isParquetINT96AsTimestamp))
+      dataSchemaIncludesPartitionKeys =
+        isPartitioned &&
+          partitionColumns.forall(f => metadataCache.parquetSchema.fieldNames.contains(f.name))
 
-  val dataIncludesKey =
-    partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true)
+      schema = {
+        val fullParquetSchema = if (dataSchemaIncludesPartitionKeys) {
+          metadataCache.parquetSchema
+        } else {
+          StructType(metadataCache.parquetSchema.fields ++ partitionColumns.fields)
+        }
 
-  override val schema =
-    if (dataIncludesKey) {
-      dataSchema
-    } else {
-      StructType(dataSchema.fields :+ StructField(partitionKeys.head, IntegerType))
+        maybeMetastoreSchema
+          .map(ParquetRelation2.mergeMetastoreParquetSchema(_, fullParquetSchema))
+          .getOrElse(fullParquetSchema)
+      }
     }
 
-  override def buildScan(output: Seq[Attribute], predicates: Seq[Expression]): RDD[Row] = {
-    // This is mostly a hack so that we can use the existing parquet filter code.
-    val requiredColumns = output.map(_.name)
+    private def readSchema(): StructType = {
+      // Sees which file(s) we need to touch in order to figure out the schema.
+      val filesToTouch =
+      // Always tries the summary files first if users don't require a merged schema.  In this case,
+      // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
+      // groups information, and could be much smaller for large Parquet files with lots of row
+      // groups.
+      //
+      // NOTE: Metadata stored in the summary files are merged from all part-files.  However, for
+      // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
+      // how to merge them correctly if some key is associated with different values in different
+      // part-files.  When this happens, Parquet simply gives up generating the summary file.  This
+      // implies that if a summary file presents, then:
+      //
+      //   1. Either all part-files have exactly the same Spark SQL schema, or
+      //   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
+      //      their schemas may differ from each other).
+      //
+      // Here we tend to be pessimistic and take the second case into account.  Basically this means
+      // we can't trust the summary files if users require a merged schema, and must touch all part-
+      // files to do the merge.
+        if (shouldMergeSchemas) {
+          // Also includes summary files, 'cause there might be empty partition directories.
+          (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
+        } else {
+          // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
+          // don't have this.
+          commonMetadataStatuses.headOption
+            // Falls back to "_metadata"
+            .orElse(metadataStatuses.headOption)
+            // Summary file(s) not found, the Parquet file is either corrupted, or different part-
+            // files contain conflicting user defined metadata (two or more values are associated
+            // with a same key in different files).  In either case, we fall back to any of the
+            // first part-file, and just assume all schemas are consistent.
+            .orElse(dataStatuses.headOption)
+            .toSeq
+        }
 
-    val job = new Job(sparkContext.hadoopConfiguration)
-    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
-    val jobConf: Configuration = ContextUtil.getConfiguration(job)
+      ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
+    }
+  }
 
-    val requestedSchema = StructType(requiredColumns.map(schema(_)))
+  @transient private val metadataCache = new MetadataCache
+  metadataCache.refresh()
 
-    val partitionKeySet = partitionKeys.toSet
-    val rawPredicate =
-      predicates
-        .filter(_.references.map(_.name).toSet.subsetOf(partitionKeySet))
-        .reduceOption(And)
-        .getOrElse(Literal(true))
+  private def partitionColumns = metadataCache.partitionSpec.partitionColumns
 
-    // Translate the predicate so that it reads from the information derived from the
-    // folder structure
-    val castedPredicate = rawPredicate transform {
-      case a: AttributeReference =>
-        val idx = partitionKeys.indexWhere(a.name == _)
-        BoundReference(idx, IntegerType, nullable = true)
-    }
+  private def partitions = metadataCache.partitionSpec.partitions
 
-    val inputData = new GenericMutableRow(partitionKeys.size)
-    val pruningCondition = InterpretedPredicate(castedPredicate)
+  private def isPartitioned = partitionColumns.nonEmpty
 
-    val selectedPartitions =
-      if (partitionKeys.nonEmpty && predicates.nonEmpty) {
-        partitions.filter { part =>
-          inputData(0) = part.partitionValues.values.head
-          pruningCondition(inputData)
-        }
-      } else {
-        partitions
+  private def dataSchemaIncludesPartitionKeys = metadataCache.dataSchemaIncludesPartitionKeys
+
+  override def schema = metadataCache.schema
+
+  private def isSummaryFile(file: Path): Boolean = {
+    file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
+      file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
+  }
+
+  // TODO Should calculate per scan size
+  // It's common that a query only scans a fraction of a large Parquet file.  Returning size of the
+  // whole Parquet file disables some optimizations in this case (e.g. broadcast join).
+  override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
+
+  // This is mostly a hack so that we can use the existing parquet filter code.
+  override def buildScan(output: Seq[Attribute], predicates: Seq[Expression]): RDD[Row] = {
+    val job = new Job(sparkContext.hadoopConfiguration)
+    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
+    val jobConf: Configuration = ContextUtil.getConfiguration(job)
+
+    val selectedPartitions = prunePartitions(predicates, partitions)
+    val selectedFiles = if (isPartitioned) {
+      selectedPartitions.flatMap { p =>
+        metadataCache.dataStatuses.filter(_.getPath.getParent.toString == p.path)
       }
+    } else {
+      metadataCache.dataStatuses.toSeq
+    }
 
-    val fs = FileSystem.get(new java.net.URI(path), sparkContext.hadoopConfiguration)
-    val selectedFiles = selectedPartitions.flatMap(_.files).map(f => fs.makeQualified(f.getPath))
     // FileInputFormat cannot handle empty lists.
     if (selectedFiles.nonEmpty) {
-      org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles: _*)
+      FileInputFormat.setInputPaths(job, selectedFiles.map(_.getPath): _*)
     }
 
     // Push down filters when possible. Notice that not all filters can be converted to Parquet
@@ -203,23 +325,28 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
       .filter(_ => sqlContext.conf.parquetFilterPushDown)
       .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
 
-    def percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100
-    logInfo(s"Reading $percentRead% of $path partitions")
+    if (isPartitioned) {
+      def percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100
+      logInfo(s"Reading $percentRead% of partitions")
+    }
+
+    val requiredColumns = output.map(_.name)
+    val requestedSchema = StructType(requiredColumns.map(schema(_)))
 
     // Store both requested and original schema in `Configuration`
     jobConf.set(
       RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-      ParquetTypesConverter.convertToString(requestedSchema.toAttributes))
+      convertToString(requestedSchema.toAttributes))
     jobConf.set(
       RowWriteSupport.SPARK_ROW_SCHEMA,
-      ParquetTypesConverter.convertToString(schema.toAttributes))
+      convertToString(schema.toAttributes))
 
     // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
     val useCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
     jobConf.set(SQLConf.PARQUET_CACHE_METADATA, useCache.toString)
 
     val baseRDD =
-      new org.apache.spark.rdd.NewHadoopRDD(
+      new NewHadoopRDD(
           sparkContext,
           classOf[FilteringParquetRowInputFormat],
           classOf[Void],
@@ -228,66 +355,400 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
         val cacheMetadata = useCache
 
         @transient
-        val cachedStatus = selectedPartitions.flatMap(_.files)
+        val cachedStatus = selectedFiles
 
         // Overridden so we can inject our own cached files statuses.
         override def getPartitions: Array[SparkPartition] = {
-          val inputFormat =
-            if (cacheMetadata) {
-              new FilteringParquetRowInputFormat {
-                override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
-              }
-            } else {
-              new FilteringParquetRowInputFormat
+          val inputFormat = if (cacheMetadata) {
+            new FilteringParquetRowInputFormat {
+              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
             }
-
-          inputFormat match {
-            case configurable: Configurable =>
-              configurable.setConf(getConf)
-            case _ =>
+          } else {
+            new FilteringParquetRowInputFormat
           }
+
           val jobContext = newJobContext(getConf, jobId)
-          val rawSplits = inputFormat.getSplits(jobContext).toArray
-          val result = new Array[SparkPartition](rawSplits.size)
-          for (i <- 0 until rawSplits.size) {
-            result(i) =
-              new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+          val rawSplits = inputFormat.getSplits(jobContext)
+
+          Array.tabulate[SparkPartition](rawSplits.size) { i =>
+            new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
           }
-          result
         }
       }
 
-    // The ordinal for the partition key in the result row, if requested.
-    val partitionKeyLocation =
-      partitionKeys
-        .headOption
-        .map(requiredColumns.indexOf(_))
-        .getOrElse(-1)
+    // The ordinals for partition keys in the result row, if requested.
+    val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
+      case (name, index) => index -> requiredColumns.indexOf(name)
+    }.toMap.filter {
+      case (_, index) => index >= 0
+    }
 
     // When the data does not include the key and the key is requested then we must fill it in
     // based on information from the input split.
-    if (!dataIncludesKey && partitionKeyLocation != -1) {
-      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
-        val partValue = "([^=]+)=([^=]+)".r
-        val partValues =
-          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
-            .getPath
-            .toString
-            .split("/")
-            .flatMap {
-            case partValue(key, value) => Some(key -> value)
-            case _ => None
-          }.toMap
-
-        val currentValue = partValues.values.head.toInt
-        iter.map { pair =>
-          val res = pair._2.asInstanceOf[SpecificMutableRow]
-          res.setInt(partitionKeyLocation, currentValue)
-          res
+    if (!dataSchemaIncludesPartitionKeys && partitionKeyLocations.nonEmpty) {
+      baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
+        val partValues = selectedPartitions.collectFirst {
+          case p if split.getPath.getParent.toString == p.path => p.values
+        }.get
+
+        iterator.map { pair =>
+          val row = pair._2.asInstanceOf[SpecificMutableRow]
+          var i = 0
+          while (i < partValues.size) {
+            // TODO Avoids boxing cost here!
+            row.update(partitionKeyLocations(i), partValues(i))
+            i += 1
+          }
+          row
         }
       }
     } else {
       baseRDD.map(_._2)
     }
   }
+
+  private def prunePartitions(
+      predicates: Seq[Expression],
+      partitions: Seq[Partition]): Seq[Partition] = {
+    val partitionColumnNames = partitionColumns.map(_.name).toSet
+    val partitionPruningPredicates = predicates.filter {
+      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+    }
+
+    val rawPredicate = partitionPruningPredicates.reduceOption(And).getOrElse(Literal(true))
+    val boundPredicate = InterpretedPredicate(rawPredicate transform {
+      case a: AttributeReference =>
+        val index = partitionColumns.indexWhere(a.name == _.name)
+        BoundReference(index, partitionColumns(index).dataType, nullable = true)
+    })
+
+    if (isPartitioned && partitionPruningPredicates.nonEmpty) {
+      partitions.filter(p => boundPredicate(p.values))
+    } else {
+      partitions
+    }
+  }
+
+  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+    // TODO: currently we do not check whether the "schema"s are compatible
+    // That means if one first creates a table and then INSERTs data with
+    // and incompatible schema the execution will fail. It would be nice
+    // to catch this early one, maybe having the planner validate the schema
+    // before calling execute().
+
+    val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
+    val writeSupport = if (schema.map(_.dataType).forall(_.isPrimitive)) {
+      log.debug("Initializing MutableRowWriteSupport")
+      classOf[MutableRowWriteSupport]
+    } else {
+      classOf[RowWriteSupport]
+    }
+
+    ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
+
+    val conf = ContextUtil.getConfiguration(job)
+    RowWriteSupport.setSchema(schema.toAttributes, conf)
+
+    val destinationPath = new Path(paths.head)
+
+    if (overwrite) {
+      try {
+        destinationPath.getFileSystem(conf).delete(destinationPath, true)
+      } catch {
+        case e: IOException =>
+          throw new IOException(
+            s"Unable to clear output directory ${destinationPath.toString} prior" +
+              s" to writing to Parquet file:\n${e.toString}")
+      }
+    }
+
+    job.setOutputKeyClass(classOf[Void])
+    job.setOutputValueClass(classOf[Row])
+    FileOutputFormat.setOutputPath(job, destinationPath)
+
+    val wrappedConf = new SerializableWritable(job.getConfiguration)
+    val jobTrackerId = new SimpleDateFormat("yyyyMMddHHmm").format(new Date())
+    val stageId = sqlContext.sparkContext.newRddId()
+
+    val taskIdOffset = if (overwrite) {
+      1
+    } else {
+      FileSystemHelper.findMaxTaskId(
+        FileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
+    }
+
+    def writeShard(context: TaskContext, iterator: Iterator[Row]): Unit = {
+      /* "reduce task" <split #> <attempt # = spark task #> */
+      val attemptId = newTaskAttemptID(
+        jobTrackerId, stageId, isMap = false, context.partitionId(), context.attemptNumber())
+      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
+      val format = new AppendingParquetOutputFormat(taskIdOffset)
+      val committer = format.getOutputCommitter(hadoopContext)
+      committer.setupTask(hadoopContext)
+      val writer = format.getRecordWriter(hadoopContext)
+      try {
+        while (iterator.hasNext) {
+          val row = iterator.next()
+          writer.write(null, row)
+        }
+      } finally {
+        writer.close(hadoopContext)
+      }
+      committer.commitTask(hadoopContext)
+    }
+    val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
+    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
+     * however we're only going to use this local OutputCommitter for
+     * setupJob/commitJob, so we just use a dummy "map" task.
+     */
+    val jobAttemptId = newTaskAttemptID(jobTrackerId, stageId, isMap = true, 0, 0)
+    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
+    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
+
+    jobCommitter.setupJob(jobTaskContext)
+    sqlContext.sparkContext.runJob(data.queryExecution.executedPlan.execute(), writeShard _)
+    jobCommitter.commitJob(jobTaskContext)
+
+    metadataCache.refresh()
+  }
+}
+
+object ParquetRelation2 {
+  // Whether we should merge schemas collected from all Parquet part-files.
+  val MERGE_SCHEMA = "mergeSchema"
+
+  // Hive Metastore schema, passed in when the Parquet relation is converted from Metastore
+  val METASTORE_SCHEMA = "metastoreSchema"
+
+  // Default partition name to use when the partition column value is null or empty string
+  val DEFAULT_PARTITION_NAME = "partition.defaultName"
+
+  // When true, the Parquet data source caches Parquet metadata for performance
+  val CACHE_METADATA = "cacheMetadata"
+
+  private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
+    footers.map { footer =>
+      val metadata = footer.getParquetMetadata.getFileMetaData
+      val parquetSchema = metadata.getSchema
+      val maybeSparkSchema = metadata
+        .getKeyValueMetaData
+        .toMap
+        .get(RowReadSupport.SPARK_METADATA_KEY)
+        .map(DataType.fromJson(_).asInstanceOf[StructType])
+
+      maybeSparkSchema.getOrElse {
+        // Falls back to Parquet schema if Spark SQL schema is absent.
+        StructType.fromAttributes(
+          // TODO Really no need to use `Attribute` here, we only need to know the data type.
+          convertToAttributes(
+            parquetSchema,
+            sqlContext.conf.isParquetBinaryAsString,
+            sqlContext.conf.isParquetINT96AsTimestamp))
+      }
+    }.reduce { (left, right) =>
+      try left.merge(right) catch { case e: Throwable =>
+        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
+      }
+    }
+  }
+
+  private[parquet] def mergeMetastoreParquetSchema(
+      metastoreSchema: StructType,
+      parquetSchema: StructType): StructType = {
+    def schemaConflictMessage =
+      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
+         |${metastoreSchema.prettyJson}
+         |
+         |Parquet schema:
+         |${parquetSchema.prettyJson}
+       """.stripMargin
+
+    assert(metastoreSchema.size == parquetSchema.size, schemaConflictMessage)
+
+    val ordinalMap = metastoreSchema.zipWithIndex.map {
+      case (field, index) => field.name.toLowerCase -> index
+    }.toMap
+    val reorderedParquetSchema = parquetSchema.sortBy(f => ordinalMap(f.name.toLowerCase))
+
+    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
+      // Uses Parquet field names but retains Metastore data types.
+      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
+        mSchema.copy(name = pSchema.name)
+      case _ =>
+        throw new SparkException(schemaConflictMessage)
+    })
+  }
+
+  // TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
+  // However, we are already using Catalyst expressions for partition pruning and predicate
+  // push-down here...
+  private[parquet] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
+    require(columnNames.size == literals.size)
+  }
+
+  /**
+   * Given a group of qualified paths, tries to parse them and returns a partition specification.
+   * For example, given:
+   * {{{
+   *   hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
+   *   hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
+   * }}}
+   * it returns:
+   * {{{
+   *   PartitionSpec(
+   *     partitionColumns = StructType(
+   *       StructField(name = "a", dataType = IntegerType, nullable = true),
+   *       StructField(name = "b", dataType = StringType, nullable = true),
+   *       StructField(name = "c", dataType = DoubleType, nullable = true)),
+   *     partitions = Seq(
+   *       Partition(
+   *         values = Row(1, "hello", 3.14),
+   *         path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
+   *       Partition(
+   *         values = Row(2, "world", 6.28),
+   *         path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
+   * }}}
+   */
+  private[parquet] def parsePartitions(
+      paths: Seq[Path],
+      defaultPartitionName: String): PartitionSpec = {
+    val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName)))
+    val fields = {
+      val (PartitionValues(columnNames, literals)) = partitionValues.head
+      columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
+        StructField(name, dataType, nullable = true)
+      }
+    }
+
+    val partitions = partitionValues.zip(paths).map {
+      case (PartitionValues(_, literals), path) =>
+        Partition(Row(literals.map(_.value): _*), path.toString)
+    }
+
+    PartitionSpec(StructType(fields), partitions)
+  }
+
+  /**
+   * Parses a single partition, returns column names and values of each partition column.  For
+   * example, given:
+   * {{{
+   *   path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
+   * }}}
+   * it returns:
+   * {{{
+   *   PartitionValues(
+   *     Seq("a", "b", "c"),
+   *     Seq(
+   *       Literal(42, IntegerType),
+   *       Literal("hello", StringType),
+   *       Literal(3.14, FloatType)))
+   * }}}
+   */
+  private[parquet] def parsePartition(
+      path: Path,
+      defaultPartitionName: String): PartitionValues = {
+    val columns = ArrayBuffer.empty[(String, Literal)]
+    // Old Hadoop versions don't have `Path.isRoot`
+    var finished = path.getParent == null
+    var chopped = path
+
+    while (!finished) {
+      val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
+      maybeColumn.foreach(columns += _)
+      chopped = chopped.getParent
+      finished = maybeColumn.isEmpty || chopped.getParent == null
+    }
+
+    val (columnNames, values) = columns.reverse.unzip
+    PartitionValues(columnNames, values)
+  }
+
+  private def parsePartitionColumn(
+      columnSpec: String,
+      defaultPartitionName: String): Option[(String, Literal)] = {
+    val equalSignIndex = columnSpec.indexOf('=')
+    if (equalSignIndex == -1) {
+      None
+    } else {
+      val columnName = columnSpec.take(equalSignIndex)
+      assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'")
+
+      val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
+      assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
+
+      val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName)
+      Some(columnName -> literal)
+    }
+  }
+
+  /**
+   * Resolves possible type conflicts between partitions by up-casting "lower" types.  The up-
+   * casting order is:
+   * {{{
+   *   NullType ->
+   *   IntegerType -> LongType ->
+   *   FloatType -> DoubleType -> DecimalType.Unlimited ->
+   *   StringType
+   * }}}
+   */
+  private[parquet] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
+    val distinctColNamesOfPartitions = values.map(_.columnNames).distinct
+    val columnCount = values.head.columnNames.size
+
+    // Column names of all partitions must match
+    assert(distinctColNamesOfPartitions.size == 1, {
+      val list = distinctColNamesOfPartitions.mkString("\t", "\n", "")
+      s"Conflicting partition column names detected:\n$list"
+    })
+
+    // Resolves possible type conflicts for each column
+    val resolvedValues = (0 until columnCount).map { i =>
+      resolveTypeConflicts(values.map(_.literals(i)))
+    }
+
+    // Fills resolved literals back to each partition
+    values.zipWithIndex.map { case (d, index) =>
+      d.copy(literals = resolvedValues.map(_(index)))
+    }
+  }
+
+  /**
+   * Converts a string to a `Literal` with automatic type inference.  Currently only supports
+   * [[IntegerType]], [[LongType]], [[FloatType]], [[DoubleType]], [[DecimalType.Unlimited]], and
+   * [[StringType]].
+   */
+  private[parquet] def inferPartitionColumnValue(
+      raw: String,
+      defaultPartitionName: String): Literal = {
+    // First tries integral types
+    Try(Literal(Integer.parseInt(raw), IntegerType))
+      .orElse(Try(Literal(JLong.parseLong(raw), LongType)))
+      // Then falls back to fractional types
+      .orElse(Try(Literal(JFloat.parseFloat(raw), FloatType)))
+      .orElse(Try(Literal(JDouble.parseDouble(raw), DoubleType)))
+      .orElse(Try(Literal(new JBigDecimal(raw), DecimalType.Unlimited)))
+      // Then falls back to string
+      .getOrElse {
+        if (raw == defaultPartitionName) Literal(null, NullType) else Literal(raw, StringType)
+      }
+  }
+
+  private val upCastingOrder: Seq[DataType] =
+    Seq(NullType, IntegerType, LongType, FloatType, DoubleType, DecimalType.Unlimited, StringType)
+
+  /**
+   * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
+   * types.
+   */
+  private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
+    val desiredType = {
+      val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
+      // Falls back to string if all values of this column are null or empty string
+      if (topType == NullType) StringType else topType
+    }
+
+    literals.map { case l @ Literal(_, dataType) =>
+      Literal(Cast(l, desiredType).eval(), desiredType)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 386ff24..d23ffb8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -18,12 +18,12 @@
 package org.apache.spark.sql.sources
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, Strategy}
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, InsertIntoTable => LogicalInsertIntoTable}
-import org.apache.spark.sql.execution
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{Row, Strategy, execution}
 
 /**
  * A Strategy for planning scans over data sources defined using the sources API.
@@ -54,7 +54,7 @@ private[sql] object DataSourceStrategy extends Strategy {
     case l @ LogicalRelation(t: TableScan) =>
       execution.PhysicalRDD(l.output, t.buildScan()) :: Nil
 
-    case i @ LogicalInsertIntoTable(
+    case i @ logical.InsertIntoTable(
       l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) =>
       if (partition.nonEmpty) {
         sys.error(s"Insert into a partition is not allowed because $l is not partitioned.")

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 2ef740b..9c37e01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -241,20 +241,16 @@ object ResolvedDataSource {
     val relation = userSpecifiedSchema match {
       case Some(schema: StructType) => {
         clazz.newInstance match {
-          case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
-            dataSource
-              .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
-              .createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
+          case dataSource: SchemaRelationProvider =>
+            dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
           case dataSource: org.apache.spark.sql.sources.RelationProvider =>
             sys.error(s"${clazz.getCanonicalName} does not allow user-specified schemas.")
         }
       }
       case None => {
         clazz.newInstance match {
-          case dataSource: org.apache.spark.sql.sources.RelationProvider =>
-            dataSource
-              .asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
-              .createRelation(sqlContext, new CaseInsensitiveMap(options))
+          case dataSource: RelationProvider =>
+            dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
           case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
             sys.error(s"A schema needs to be specified when using ${clazz.getCanonicalName}.")
         }
@@ -279,10 +275,8 @@ object ResolvedDataSource {
     }
 
     val relation = clazz.newInstance match {
-      case dataSource: org.apache.spark.sql.sources.CreateableRelationProvider =>
-        dataSource
-          .asInstanceOf[org.apache.spark.sql.sources.CreateableRelationProvider]
-          .createRelation(sqlContext, options, data)
+      case dataSource: CreatableRelationProvider =>
+        dataSource.createRelation(sqlContext, options, data)
       case _ =>
         sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
     }
@@ -366,7 +360,7 @@ private [sql] case class CreateTempTableUsingAsSelect(
 /**
  * Builds a map in which keys are case insensitive
  */
-protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String] 
+protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
   with Serializable {
 
   val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index ad0a35b..40fc1f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -78,7 +78,7 @@ trait SchemaRelationProvider {
 }
 
 @DeveloperApi
-trait CreateableRelationProvider {
+trait CreatableRelationProvider {
   def createRelation(
       sqlContext: SQLContext,
       parameters: Map[String, String],


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org