You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/05/13 20:04:15 UTC

[2/2] spark git commit: [SPARK-7567] [SQL] Migrating Parquet data source to FSBasedRelation

[SPARK-7567] [SQL] Migrating Parquet data source to FSBasedRelation

This PR migrates Parquet data source to the newly introduced `FSBasedRelation`. `FSBasedParquetRelation` is created to replace `ParquetRelation2`. Major differences are:

1. Partition discovery code has been factored out to `FSBasedRelation`
1. `AppendingParquetOutputFormat` is not used now. Instead, an anonymous subclass of `ParquetOutputFormat` is used to handle appending and writing dynamic partitions
1. When scanning partitioned tables, `FSBasedParquetRelation.buildScan` only builds an `RDD[Row]` for a single selected partition
1. `FSBasedParquetRelation` doesn't rely on Catalyst expressions for filter push down, thus it doesn't extend `CatalystScan` anymore

   After migrating `JSONRelation` (which extends `CatalystScan`), we can remove `CatalystScan`.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6090)
<!-- Reviewable:end -->

Author: Cheng Lian <li...@databricks.com>

Closes #6090 from liancheng/parquet-migration and squashes the following commits:

6063f87 [Cheng Lian] Casts to OutputCommitter rather than FileOutputCommtter
bfd1cf0 [Cheng Lian] Fixes compilation error introduced while rebasing
f9ea56e [Cheng Lian] Adds ParquetRelation2 related classes to MiMa check whitelist
261d8c1 [Cheng Lian] Minor bug fix and more tests
db65660 [Cheng Lian] Migrates Parquet data source to FSBasedRelation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ff16e8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ff16e8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ff16e8a

Branch: refs/heads/master
Commit: 7ff16e8abef9fbf4a4855e23c256b22e62e560a6
Parents: bec938f
Author: Cheng Lian <li...@databricks.com>
Authored: Wed May 13 11:04:10 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed May 13 11:04:10 2015 -0700

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |   6 +
 .../scala/org/apache/spark/sql/SQLContext.scala |   8 +-
 .../spark/sql/parquet/ParquetFilters.scala      | 278 +++---
 .../sql/parquet/ParquetTableOperations.scala    |   2 +-
 .../spark/sql/parquet/fsBasedParquet.scala      | 565 +++++++++++++
 .../apache/spark/sql/parquet/newParquet.scala   | 840 -------------------
 .../org/apache/spark/sql/sources/commands.scala |  19 +-
 .../spark/sql/parquet/ParquetFilterSuite.scala  |   6 +-
 .../spark/sql/parquet/ParquetIOSuite.scala      |   6 +-
 .../ParquetPartitionDiscoverySuite.scala        |  10 +-
 .../spark/sql/parquet/ParquetSchemaSuite.scala  |  12 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  25 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  15 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |  16 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |  35 +-
 .../sql/sources/FSBasedRelationSuite.scala      | 523 ------------
 .../sql/sources/fsBasedRelationSuites.scala     | 564 +++++++++++++
 17 files changed, 1383 insertions(+), 1547 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a47e29e..f31f0e5 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -111,6 +111,12 @@ object MimaExcludes {
               "org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues"),
             ProblemFilters.exclude[MissingClassProblem](
               "org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues$"),
+            ProblemFilters.exclude[MissingClassProblem](
+              "org.apache.spark.sql.parquet.ParquetRelation2"),
+            ProblemFilters.exclude[MissingClassProblem](
+              "org.apache.spark.sql.parquet.ParquetRelation2$"),
+            ProblemFilters.exclude[MissingClassProblem](
+              "org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache"),
             // These test support classes were moved out of src/main and into src/test:
             ProblemFilters.exclude[MissingClassProblem](
               "org.apache.spark.sql.parquet.ParquetTestData"),

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 975498c..0a148c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -27,9 +27,11 @@ import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
 import com.google.common.reflect.TypeToken
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
@@ -42,6 +44,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, e
 import org.apache.spark.sql.execution.{Filter, _}
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
 import org.apache.spark.sql.json._
+import org.apache.spark.sql.parquet.FSBasedParquetRelation
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -641,7 +644,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
     if (paths.isEmpty) {
       emptyDataFrame
     } else if (conf.parquetUseDataSourceApi) {
-      baseRelationToDataFrame(parquet.ParquetRelation2(paths, Map.empty)(this))
+      val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
+      baseRelationToDataFrame(
+        new FSBasedParquetRelation(
+          globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
     } else {
       DataFrame(this, parquet.ParquetRelation(
         paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 5eb1c6a..f0f4e7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -29,128 +29,184 @@ import parquet.io.api.Binary
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
 
 private[sql] object ParquetFilters {
   val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
 
   def createRecordFilter(filterExpressions: Seq[Expression]): Option[Filter] = {
-    filterExpressions.flatMap(createFilter).reduceOption(FilterApi.and).map(FilterCompat.get)
+    filterExpressions.flatMap { filter =>
+      createFilter(filter)
+    }.reduceOption(FilterApi.and).map(FilterCompat.get)
   }
 
-  def createFilter(predicate: Expression): Option[FilterPredicate] = {
-    val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
-      case BooleanType =>
-        (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
-      case IntegerType =>
-        (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
-      case LongType =>
-        (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
-      case FloatType =>
-        (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
-      case DoubleType =>
-        (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
-
-      // Binary.fromString and Binary.fromByteArray don't accept null values
-      case StringType =>
-        (n: String, v: Any) => FilterApi.eq(
-          binaryColumn(n),
-          Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
-      case BinaryType =>
-        (n: String, v: Any) => FilterApi.eq(
-          binaryColumn(n),
-          Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-    }
+  private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
+    case BooleanType =>
+      (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
+    case IntegerType =>
+      (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
+    case LongType =>
+      (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
+    case FloatType =>
+      (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
+    case DoubleType =>
+      (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
 
-    val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
-      case BooleanType =>
-        (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
-      case IntegerType =>
-        (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])
-      case LongType =>
-        (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
-      case FloatType =>
-        (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
-      case DoubleType =>
-        (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
-      case StringType =>
-        (n: String, v: Any) => FilterApi.notEq(
-          binaryColumn(n),
-          Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
-      case BinaryType =>
-        (n: String, v: Any) => FilterApi.notEq(
-          binaryColumn(n),
-          Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-    }
+    // Binary.fromString and Binary.fromByteArray don't accept null values
+    case StringType =>
+      (n: String, v: Any) => FilterApi.eq(
+        binaryColumn(n),
+        Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
+    case BinaryType =>
+      (n: String, v: Any) => FilterApi.eq(
+        binaryColumn(n),
+        Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+  }
 
-    val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
-      case IntegerType =>
-        (n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer])
-      case LongType =>
-        (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
-      case FloatType =>
-        (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
-      case DoubleType =>
-        (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
-      case StringType =>
-        (n: String, v: Any) =>
-          FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
-      case BinaryType =>
-        (n: String, v: Any) =>
-          FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
-    }
+  private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
+    case BooleanType =>
+      (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
+    case IntegerType =>
+      (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])
+    case LongType =>
+      (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
+    case FloatType =>
+      (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
+    case DoubleType =>
+      (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+    case StringType =>
+      (n: String, v: Any) => FilterApi.notEq(
+        binaryColumn(n),
+        Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
+    case BinaryType =>
+      (n: String, v: Any) => FilterApi.notEq(
+        binaryColumn(n),
+        Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+  }
 
-    val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
-      case IntegerType =>
-        (n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
-      case LongType =>
-        (n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
-      case FloatType =>
-        (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
-      case DoubleType =>
-        (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
-      case StringType =>
-        (n: String, v: Any) =>
-          FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
-      case BinaryType =>
-        (n: String, v: Any) =>
-          FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
-    }
+  private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
+    case IntegerType =>
+      (n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer])
+    case LongType =>
+      (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
+    case FloatType =>
+      (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
+    case DoubleType =>
+      (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+    case StringType =>
+      (n: String, v: Any) =>
+        FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
+    case BinaryType =>
+      (n: String, v: Any) =>
+        FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+  }
 
-    val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
-      case IntegerType =>
-        (n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[java.lang.Integer])
-      case LongType =>
-        (n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
-      case FloatType =>
-        (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
-      case DoubleType =>
-        (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
-      case StringType =>
-        (n: String, v: Any) =>
-          FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
-      case BinaryType =>
-        (n: String, v: Any) =>
-          FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
-    }
+  private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
+    case IntegerType =>
+      (n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
+    case LongType =>
+      (n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
+    case FloatType =>
+      (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
+    case DoubleType =>
+      (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+    case StringType =>
+      (n: String, v: Any) =>
+        FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
+    case BinaryType =>
+      (n: String, v: Any) =>
+        FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+  }
 
-    val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
-      case IntegerType =>
-        (n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
-      case LongType =>
-        (n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
-      case FloatType =>
-        (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
-      case DoubleType =>
-        (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
-      case StringType =>
-        (n: String, v: Any) =>
-          FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
-      case BinaryType =>
-        (n: String, v: Any) =>
-          FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+  private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
+    case IntegerType =>
+      (n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[java.lang.Integer])
+    case LongType =>
+      (n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
+    case FloatType =>
+      (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
+    case DoubleType =>
+      (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+    case StringType =>
+      (n: String, v: Any) =>
+        FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
+    case BinaryType =>
+      (n: String, v: Any) =>
+        FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+  }
+
+  private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
+    case IntegerType =>
+      (n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
+    case LongType =>
+      (n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
+    case FloatType =>
+      (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
+    case DoubleType =>
+      (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+    case StringType =>
+      (n: String, v: Any) =>
+        FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
+    case BinaryType =>
+      (n: String, v: Any) =>
+        FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+  }
+
+  /**
+   * Converts data sources filters to Parquet filter predicates.
+   */
+  def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
+    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+    // NOTE:
+    //
+    // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
+    // which can be casted to `false` implicitly. Please refer to the `eval` method of these
+    // operators and the `SimplifyFilters` rule for details.
+    predicate match {
+      case sources.IsNull(name) =>
+        makeEq.lift(dataTypeOf(name)).map(_(name, null))
+      case sources.IsNotNull(name) =>
+        makeNotEq.lift(dataTypeOf(name)).map(_(name, null))
+
+      case sources.EqualTo(name, value) =>
+        makeEq.lift(dataTypeOf(name)).map(_(name, value))
+      case sources.Not(sources.EqualTo(name, value)) =>
+        makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
+
+      case sources.LessThan(name, value) =>
+        makeLt.lift(dataTypeOf(name)).map(_(name, value))
+      case sources.LessThanOrEqual(name, value) =>
+        makeLtEq.lift(dataTypeOf(name)).map(_(name, value))
+
+      case sources.GreaterThan(name, value) =>
+        makeGt.lift(dataTypeOf(name)).map(_(name, value))
+      case sources.GreaterThanOrEqual(name, value) =>
+        makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
+
+      case sources.And(lhs, rhs) =>
+        (createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and)
+
+      case sources.Or(lhs, rhs) =>
+        for {
+          lhsFilter <- createFilter(schema, lhs)
+          rhsFilter <- createFilter(schema, rhs)
+        } yield FilterApi.or(lhsFilter, rhsFilter)
+
+      case sources.Not(pred) =>
+        createFilter(schema, pred).map(FilterApi.not)
+
+      case _ => None
     }
+  }
 
+  /**
+   * Converts Catalyst predicate expressions to Parquet filter predicates.
+   *
+   * @todo This can be removed once we get rid of the old Parquet support.
+   */
+  def createFilter(predicate: Expression): Option[FilterPredicate] = {
     // NOTE:
     //
     // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
@@ -170,7 +226,7 @@ private[sql] object ParquetFilters {
         makeEq.lift(dataType).map(_(name, value))
       case EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
         makeEq.lift(dataType).map(_(name, value))
-      
+
       case Not(EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType))) =>
         makeNotEq.lift(dataType).map(_(name, value))
       case Not(EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _))) =>
@@ -192,7 +248,7 @@ private[sql] object ParquetFilters {
       case LessThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
         makeLtEq.lift(dataType).map(_(name, value))
       case LessThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
-        makeLtEq.lift(dataType).map(_(name, value))      
+        makeLtEq.lift(dataType).map(_(name, value))
       case LessThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
         makeGtEq.lift(dataType).map(_(name, value))
       case LessThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
@@ -201,7 +257,7 @@ private[sql] object ParquetFilters {
       case GreaterThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
         makeGt.lift(dataType).map(_(name, value))
       case GreaterThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
-        makeGt.lift(dataType).map(_(name, value)) 
+        makeGt.lift(dataType).map(_(name, value))
       case GreaterThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
         makeLt.lift(dataType).map(_(name, value))
       case GreaterThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
@@ -210,7 +266,7 @@ private[sql] object ParquetFilters {
       case GreaterThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
         makeGtEq.lift(dataType).map(_(name, value))
       case GreaterThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
-        makeGtEq.lift(dataType).map(_(name, value)) 
+        makeGtEq.lift(dataType).map(_(name, value))
       case GreaterThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
         makeLtEq.lift(dataType).map(_(name, value))
       case GreaterThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 75ac52d..90950f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -674,7 +674,7 @@ private[parquet] object FileSystemHelper {
   def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
     val files = FileSystemHelper.listFiles(pathStr, conf)
     // filename pattern is part-r-<int>.parquet
-    val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid")
+    val nameP = new scala.util.matching.Regex("""part-.-(\d{1,}).*""", "taskid")
     val hiddenFileP = new scala.util.matching.Regex("_.*")
     files.map(_.getName).map {
       case nameP(taskid) => taskid.toInt

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
new file mode 100644
index 0000000..d810d6a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+import scala.util.Try
+
+import com.google.common.base.Objects
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import parquet.filter2.predicate.FilterApi
+import parquet.format.converter.ParquetMetadataConverter
+import parquet.hadoop._
+import parquet.hadoop.metadata.CompressionCodecName
+import parquet.hadoop.util.ContextUtil
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD._
+import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.{Row, SQLConf, SQLContext}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
+
+private[sql] class DefaultSource extends FSBasedRelationProvider {
+  override def createRelation(
+      sqlContext: SQLContext,
+      paths: Array[String],
+      schema: Option[StructType],
+      partitionColumns: Option[StructType],
+      parameters: Map[String, String]): FSBasedRelation = {
+    val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
+    new FSBasedParquetRelation(paths, schema, partitionSpec, parameters)(sqlContext)
+  }
+}
+
+// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
+private[sql] class ParquetOutputWriter extends OutputWriter {
+  private var recordWriter: RecordWriter[Void, Row] = _
+  private var taskAttemptContext: TaskAttemptContext = _
+
+  override def init(
+      path: String,
+      dataSchema: StructType,
+      context: TaskAttemptContext): Unit = {
+    val conf = context.getConfiguration
+    val outputFormat = {
+      // When appending new Parquet files to an existing Parquet file directory, to avoid
+      // overwriting existing data files, we need to find out the max task ID encoded in these data
+      // file names.
+      // TODO Make this snippet a utility function for other data source developers
+      val maxExistingTaskId = {
+        // Note that `path` may point to a temporary location.  Here we retrieve the real
+        // destination path from the configuration
+        val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
+        val fs = outputPath.getFileSystem(conf)
+
+        if (fs.exists(outputPath)) {
+          // Pattern used to match task ID in part file names, e.g.:
+          //
+          //   part-r-00001.gz.part
+          //          ^~~~~
+          val partFilePattern = """part-.-(\d{1,}).*""".r
+
+          fs.listStatus(outputPath).map(_.getPath.getName).map {
+            case partFilePattern(id) => id.toInt
+            case name if name.startsWith("_") => 0
+            case name if name.startsWith(".") => 0
+            case name => sys.error(
+              s"""Trying to write Parquet files to directory $outputPath,
+                 |but found items with illegal name "$name"
+               """.stripMargin.replace('\n', ' ').trim)
+          }.reduceOption(_ max _).getOrElse(0)
+        } else {
+          0
+        }
+      }
+
+      new ParquetOutputFormat[Row]() {
+        // Here we override `getDefaultWorkFile` for two reasons:
+        //
+        //  1. To allow appending.  We need to generate output file name based on the max available
+        //     task ID computed above.
+        //
+        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` uses
+        //     `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
+        //     partitions in the case of dynamic partitioning.
+        override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+          val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
+          new Path(path, f"part-r-$split%05d$extension")
+        }
+      }
+    }
+
+    recordWriter = outputFormat.getRecordWriter(context)
+    taskAttemptContext = context
+  }
+
+  override def write(row: Row): Unit = recordWriter.write(null, row)
+
+  override def close(): Unit = recordWriter.close(taskAttemptContext)
+}
+
+private[sql] class FSBasedParquetRelation(
+    paths: Array[String],
+    private val maybeDataSchema: Option[StructType],
+    private val maybePartitionSpec: Option[PartitionSpec],
+    parameters: Map[String, String])(
+    val sqlContext: SQLContext)
+  extends FSBasedRelation(paths, maybePartitionSpec)
+  with Logging {
+
+  // Should we merge schemas from all Parquet part-files?
+  private val shouldMergeSchemas =
+    parameters.getOrElse(FSBasedParquetRelation.MERGE_SCHEMA, "true").toBoolean
+
+  private val maybeMetastoreSchema = parameters
+    .get(FSBasedParquetRelation.METASTORE_SCHEMA)
+    .map(DataType.fromJson(_).asInstanceOf[StructType])
+
+  private val metadataCache = new MetadataCache
+  metadataCache.refresh()
+
+  override def equals(other: scala.Any): Boolean = other match {
+    case that: FSBasedParquetRelation =>
+      val schemaEquality = if (shouldMergeSchemas) {
+        this.shouldMergeSchemas == that.shouldMergeSchemas
+      } else {
+        this.dataSchema == that.dataSchema &&
+          this.schema == that.schema
+      }
+
+      this.paths.toSet == that.paths.toSet &&
+        schemaEquality &&
+        this.maybeDataSchema == that.maybeDataSchema &&
+        this.partitionColumns == that.partitionColumns
+
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    if (shouldMergeSchemas) {
+      Objects.hashCode(
+        Boolean.box(shouldMergeSchemas),
+        paths.toSet,
+        maybeDataSchema,
+        maybePartitionSpec)
+    } else {
+      Objects.hashCode(
+        Boolean.box(shouldMergeSchemas),
+        paths.toSet,
+        dataSchema,
+        schema,
+        maybeDataSchema,
+        maybePartitionSpec)
+    }
+  }
+
+  override def outputWriterClass: Class[_ <: OutputWriter] = classOf[ParquetOutputWriter]
+
+  override def dataSchema: StructType = metadataCache.dataSchema
+
+  override private[sql] def refresh(): Unit = {
+    metadataCache.refresh()
+    super.refresh()
+  }
+
+  // Parquet data source always uses Catalyst internal representations.
+  override val needConversion: Boolean = false
+
+  override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
+
+  override def prepareForWrite(job: Job): Unit = {
+    val conf = ContextUtil.getConfiguration(job)
+
+    val committerClass =
+      conf.getClass(
+        "spark.sql.parquet.output.committer.class",
+        classOf[ParquetOutputCommitter],
+        classOf[ParquetOutputCommitter])
+
+    conf.setClass(
+      "mapred.output.committer.class",
+      committerClass,
+      classOf[ParquetOutputCommitter])
+
+    // TODO There's no need to use two kinds of WriteSupport
+    // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
+    // complex types.
+    val writeSupportClass =
+      if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
+        classOf[MutableRowWriteSupport]
+      } else {
+        classOf[RowWriteSupport]
+      }
+
+    ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
+    RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
+
+    // Sets compression scheme
+    conf.set(
+      ParquetOutputFormat.COMPRESSION,
+      ParquetRelation
+        .shortParquetCompressionCodecNames
+        .getOrElse(
+          sqlContext.conf.parquetCompressionCodec.toUpperCase,
+          CompressionCodecName.UNCOMPRESSED).name())
+  }
+
+  override def buildScan(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      inputPaths: Array[String]): RDD[Row] = {
+
+    val job = Job.getInstance(SparkHadoopUtil.get.conf)
+    val conf = ContextUtil.getConfiguration(job)
+
+    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
+
+    if (inputPaths.nonEmpty) {
+      FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
+    }
+
+    // Try to push down filters when filter push-down is enabled.
+    if (sqlContext.conf.parquetFilterPushDown) {
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+        // is used here.
+        .flatMap(ParquetFilters.createFilter(dataSchema, _))
+        .reduceOption(FilterApi.and)
+        .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
+    }
+
+    conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+      val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
+      ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
+    })
+
+    conf.set(
+      RowWriteSupport.SPARK_ROW_SCHEMA,
+      ParquetTypesConverter.convertToString(dataSchema.toAttributes))
+
+    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
+    conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
+
+    val inputFileStatuses =
+      metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
+
+    val footers = inputFileStatuses.map(metadataCache.footers)
+
+    // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
+    // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
+    // footers.  Especially when a global arbitrative schema (either from metastore or data source
+    // DDL) is available.
+    new NewHadoopRDD(
+      sqlContext.sparkContext,
+      classOf[FilteringParquetRowInputFormat],
+      classOf[Void],
+      classOf[Row],
+      conf) {
+
+      val cacheMetadata = useMetadataCache
+
+      @transient val cachedStatuses = inputFileStatuses.map { f =>
+        // In order to encode the authority of a Path containing special characters such as /,
+        // we need to use the string returned by the URI of the path to create a new Path.
+        val pathWithAuthority = new Path(f.getPath.toUri.toString)
+
+        new FileStatus(
+          f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
+          f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
+      }.toSeq
+
+      @transient val cachedFooters = footers.map { f =>
+        // In order to encode the authority of a Path containing special characters such as /,
+        // we need to use the string returned by the URI of the path to create a new Path.
+        new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
+      }.toSeq
+
+      // Overridden so we can inject our own cached files statuses.
+      override def getPartitions: Array[SparkPartition] = {
+        val inputFormat = if (cacheMetadata) {
+          new FilteringParquetRowInputFormat {
+            override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
+
+            override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
+          }
+        } else {
+          new FilteringParquetRowInputFormat
+        }
+
+        val jobContext = newJobContext(getConf, jobId)
+        val rawSplits = inputFormat.getSplits(jobContext)
+
+        Array.tabulate[SparkPartition](rawSplits.size) { i =>
+          new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+        }
+      }
+    }.values
+  }
+
+  private class MetadataCache {
+    // `FileStatus` objects of all "_metadata" files.
+    private var metadataStatuses: Array[FileStatus] = _
+
+    // `FileStatus` objects of all "_common_metadata" files.
+    private var commonMetadataStatuses: Array[FileStatus] = _
+
+    // Parquet footer cache.
+    var footers: Map[FileStatus, Footer] = _
+
+    // `FileStatus` objects of all data files (Parquet part-files).
+    var dataStatuses: Array[FileStatus] = _
+
+    // Schema of the actual Parquet files, without partition columns discovered from partition
+    // directory paths.
+    var dataSchema: StructType = _
+
+    // Schema of the whole table, including partition columns.
+    var schema: StructType = _
+
+    /**
+     * Refreshes `FileStatus`es, footers, partition spec, and table schema.
+     */
+    def refresh(): Unit = {
+      // Support either reading a collection of raw Parquet part-files, or a collection of folders
+      // containing Parquet files (e.g. partitioned Parquet table).
+      val baseStatuses = paths.distinct.flatMap { p =>
+        val path = new Path(p)
+        val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+        Try(fs.getFileStatus(qualified)).toOption
+      }
+      assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
+
+      // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
+      val leaves = baseStatuses.flatMap { f =>
+        val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
+        SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
+          isSummaryFile(f.getPath) ||
+            !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+        }
+      }
+
+      dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
+      metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
+      commonMetadataStatuses =
+        leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
+
+      footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
+        val parquetMetadata = ParquetFileReader.readFooter(
+          SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
+        f -> new Footer(f.getPath, parquetMetadata)
+      }.seq.toMap
+
+      dataSchema = {
+        val dataSchema0 =
+          maybeDataSchema
+            .orElse(readSchema())
+            .orElse(maybeMetastoreSchema)
+            .getOrElse(sys.error("Failed to get the schema."))
+
+        // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
+        // case insensitivity issue and possible schema mismatch (probably caused by schema
+        // evolution).
+        maybeMetastoreSchema
+          .map(FSBasedParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
+          .getOrElse(dataSchema0)
+      }
+    }
+
+    private def isSummaryFile(file: Path): Boolean = {
+      file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
+        file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
+    }
+
+    private def readSchema(): Option[StructType] = {
+      // Sees which file(s) we need to touch in order to figure out the schema.
+      //
+      // Always tries the summary files first if users don't require a merged schema.  In this case,
+      // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
+      // groups information, and could be much smaller for large Parquet files with lots of row
+      // groups.
+      //
+      // NOTE: Metadata stored in the summary files are merged from all part-files.  However, for
+      // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
+      // how to merge them correctly if some key is associated with different values in different
+      // part-files.  When this happens, Parquet simply gives up generating the summary file.  This
+      // implies that if a summary file presents, then:
+      //
+      //   1. Either all part-files have exactly the same Spark SQL schema, or
+      //   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
+      //      their schemas may differ from each other).
+      //
+      // Here we tend to be pessimistic and take the second case into account.  Basically this means
+      // we can't trust the summary files if users require a merged schema, and must touch all part-
+      // files to do the merge.
+      val filesToTouch =
+        if (shouldMergeSchemas) {
+          // Also includes summary files, 'cause there might be empty partition directories.
+          (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
+        } else {
+          // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
+          // don't have this.
+          commonMetadataStatuses.headOption
+            // Falls back to "_metadata"
+            .orElse(metadataStatuses.headOption)
+            // Summary file(s) not found, the Parquet file is either corrupted, or different part-
+            // files contain conflicting user defined metadata (two or more values are associated
+            // with a same key in different files).  In either case, we fall back to any of the
+            // first part-file, and just assume all schemas are consistent.
+            .orElse(dataStatuses.headOption)
+            .toSeq
+        }
+
+      assert(
+        filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
+        "No schema defined, " +
+          s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
+
+      FSBasedParquetRelation.readSchema(filesToTouch.map(footers.apply), sqlContext)
+    }
+  }
+}
+
+private[sql] object FSBasedParquetRelation extends Logging {
+  // Whether we should merge schemas collected from all Parquet part-files.
+  private[sql] val MERGE_SCHEMA = "mergeSchema"
+
+  // Hive Metastore schema, used when converting Metastore Parquet tables.  This option is only used
+  // internally.
+  private[sql] val METASTORE_SCHEMA = "metastoreSchema"
+
+  private[parquet] def readSchema(
+      footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
+    footers.map { footer =>
+      val metadata = footer.getParquetMetadata.getFileMetaData
+      val parquetSchema = metadata.getSchema
+      val maybeSparkSchema = metadata
+        .getKeyValueMetaData
+        .toMap
+        .get(RowReadSupport.SPARK_METADATA_KEY)
+        .flatMap { serializedSchema =>
+          // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
+          // whatever is available.
+          Try(DataType.fromJson(serializedSchema))
+            .recover { case _: Throwable =>
+              logInfo(
+                s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
+                  "falling back to the deprecated DataType.fromCaseClassString parser.")
+              DataType.fromCaseClassString(serializedSchema)
+            }
+            .recover { case cause: Throwable =>
+              logWarning(
+                s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
+                   |\t$serializedSchema
+                 """.stripMargin,
+                cause)
+            }
+            .map(_.asInstanceOf[StructType])
+            .toOption
+        }
+
+      maybeSparkSchema.getOrElse {
+        // Falls back to Parquet schema if Spark SQL schema is absent.
+        StructType.fromAttributes(
+          // TODO Really no need to use `Attribute` here, we only need to know the data type.
+          ParquetTypesConverter.convertToAttributes(
+            parquetSchema,
+            sqlContext.conf.isParquetBinaryAsString,
+            sqlContext.conf.isParquetINT96AsTimestamp))
+      }
+    }.reduceOption { (left, right) =>
+      try left.merge(right) catch { case e: Throwable =>
+        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
+      }
+    }
+  }
+
+  /**
+   * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
+   * schema and Parquet schema.
+   *
+   * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
+   * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
+   * distinguish binary and string).  This method generates a correct schema by merging Metastore
+   * schema data types and Parquet schema field names.
+   */
+  private[parquet] def mergeMetastoreParquetSchema(
+      metastoreSchema: StructType,
+      parquetSchema: StructType): StructType = {
+    def schemaConflictMessage: String =
+      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
+         |${metastoreSchema.prettyJson}
+         |
+         |Parquet schema:
+         |${parquetSchema.prettyJson}
+       """.stripMargin
+
+    val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
+
+    assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
+
+    val ordinalMap = metastoreSchema.zipWithIndex.map {
+      case (field, index) => field.name.toLowerCase -> index
+    }.toMap
+
+    val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
+      ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
+
+    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
+      // Uses Parquet field names but retains Metastore data types.
+      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
+        mSchema.copy(name = pSchema.name)
+      case _ =>
+        throw new SparkException(schemaConflictMessage)
+    })
+  }
+
+  /**
+   * Returns the original schema from the Parquet file with any missing nullable fields from the
+   * Hive Metastore schema merged in.
+   *
+   * When constructing a DataFrame from a collection of structured data, the resulting object has
+   * a schema corresponding to the union of the fields present in each element of the collection.
+   * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
+   * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
+   * contain a particular nullable field in its schema despite that field being present in the
+   * table schema obtained from the Hive Metastore. This method returns a schema representing the
+   * Parquet file schema along with any additional nullable fields from the Metastore schema
+   * merged in.
+   */
+  private[parquet] def mergeMissingNullableFields(
+      metastoreSchema: StructType,
+      parquetSchema: StructType): StructType = {
+    val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
+    val missingFields = metastoreSchema
+      .map(_.name.toLowerCase)
+      .diff(parquetSchema.map(_.name.toLowerCase))
+      .map(fieldMap(_))
+      .filter(_.nullable)
+    StructType(parquetSchema ++ missingFields)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
deleted file mode 100644
index ee4b1c7..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ /dev/null
@@ -1,840 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.parquet
-
-import java.io.IOException
-import java.lang.{Double => JDouble, Float => JFloat, Long => JLong}
-import java.math.{BigDecimal => JBigDecimal}
-import java.net.URI
-import java.text.SimpleDateFormat
-import java.util.{Date, List => JList}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
-import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext}
-import parquet.filter2.predicate.FilterApi
-import parquet.format.converter.ParquetMetadataConverter
-import parquet.hadoop.metadata.CompressionCodecName
-import parquet.hadoop.util.ContextUtil
-import parquet.hadoop.{ParquetInputFormat, _}
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, expressions}
-import org.apache.spark.sql.parquet.ParquetTypesConverter._
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType, _}
-import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
-import org.apache.spark.{Logging, SerializableWritable, SparkException, TaskContext, Partition => SparkPartition}
-
-/**
- * Allows creation of Parquet based tables using the syntax:
- * {{{
- *   CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet OPTIONS (...)
- * }}}
- *
- * Supported options include:
- *
- *  - `path`: Required. When reading Parquet files, `path` should point to the location of the
- *    Parquet file(s). It can be either a single raw Parquet file, or a directory of Parquet files.
- *    In the latter case, this data source tries to discover partitioning information if the the
- *    directory is structured in the same style of Hive partitioned tables. When writing Parquet
- *    file, `path` should point to the destination folder.
- *
- *  - `mergeSchema`: Optional. Indicates whether we should merge potentially different (but
- *    compatible) schemas stored in all Parquet part-files.
- *
- *  - `partition.defaultName`: Optional. Partition name used when a value of a partition column is
- *    null or empty string. This is similar to the `hive.exec.default.partition.name` configuration
- *    in Hive.
- */
-private[sql] class DefaultSource
-    extends RelationProvider
-    with SchemaRelationProvider
-    with CreatableRelationProvider {
-
-  private def checkPath(parameters: Map[String, String]): String = {
-    parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))
-  }
-
-  /** Returns a new base relation with the given parameters. */
-  override def createRelation(
-      sqlContext: SQLContext,
-      parameters: Map[String, String]): BaseRelation = {
-    ParquetRelation2(Seq(checkPath(parameters)), parameters, None)(sqlContext)
-  }
-
-  /** Returns a new base relation with the given parameters and schema. */
-  override def createRelation(
-      sqlContext: SQLContext,
-      parameters: Map[String, String],
-      schema: StructType): BaseRelation = {
-    ParquetRelation2(Seq(checkPath(parameters)), parameters, Some(schema))(sqlContext)
-  }
-
-  /** Returns a new base relation with the given parameters and save given data into it. */
-  override def createRelation(
-      sqlContext: SQLContext,
-      mode: SaveMode,
-      parameters: Map[String, String],
-      data: DataFrame): BaseRelation = {
-    val path = checkPath(parameters)
-    val filesystemPath = new Path(path)
-    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-    val doInsertion = (mode, fs.exists(filesystemPath)) match {
-      case (SaveMode.ErrorIfExists, true) =>
-        sys.error(s"path $path already exists.")
-      case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
-        true
-      case (SaveMode.Ignore, exists) =>
-        !exists
-    }
-
-    val relation = if (doInsertion) {
-      // This is a hack. We always set nullable/containsNull/valueContainsNull to true
-      // for the schema of a parquet data.
-      val df =
-        sqlContext.createDataFrame(
-          data.queryExecution.toRdd,
-          data.schema.asNullable,
-          needsConversion = false)
-      val createdRelation =
-        createRelation(sqlContext, parameters, df.schema).asInstanceOf[ParquetRelation2]
-      createdRelation.insert(df, overwrite = mode == SaveMode.Overwrite)
-      createdRelation
-    } else {
-      // If the save mode is Ignore, we will just create the relation based on existing data.
-      createRelation(sqlContext, parameters)
-    }
-
-    relation
-  }
-}
-
-/**
- * An alternative to [[ParquetRelation]] that plugs in using the data sources API.  This class is
- * intended as a full replacement of the Parquet support in Spark SQL.  The old implementation will
- * be deprecated and eventually removed once this version is proved to be stable enough.
- *
- * Compared with the old implementation, this class has the following notable differences:
- *
- *  - Partitioning discovery: Hive style multi-level partitions are auto discovered.
- *  - Metadata discovery: Parquet is a format comes with schema evolving support.  This data source
- *    can detect and merge schemas from all Parquet part-files as long as they are compatible.
- *    Also, metadata and [[FileStatus]]es are cached for better performance.
- *  - Statistics: Statistics for the size of the table are automatically populated during schema
- *    discovery.
- */
-@DeveloperApi
-private[sql] case class ParquetRelation2(
-    paths: Seq[String],
-    parameters: Map[String, String],
-    maybeSchema: Option[StructType] = None,
-    maybePartitionSpec: Option[PartitionSpec] = None)(
-    @transient val sqlContext: SQLContext)
-  extends BaseRelation
-  with CatalystScan
-  with InsertableRelation
-  with SparkHadoopMapReduceUtil
-  with Logging {
-
-  // Should we merge schemas from all Parquet part-files?
-  private val shouldMergeSchemas =
-    parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
-
-  // Optional Metastore schema, used when converting Hive Metastore Parquet table
-  private val maybeMetastoreSchema =
-    parameters
-      .get(ParquetRelation2.METASTORE_SCHEMA)
-      .map(s => DataType.fromJson(s).asInstanceOf[StructType])
-
-  // Hive uses this as part of the default partition name when the partition column value is null
-  // or empty string
-  private val defaultPartitionName = parameters.getOrElse(
-    ParquetRelation2.DEFAULT_PARTITION_NAME, "__HIVE_DEFAULT_PARTITION__")
-
-  override def equals(other: Any): Boolean = other match {
-    case relation: ParquetRelation2 =>
-      // If schema merging is required, we don't compare the actual schemas since they may evolve.
-      val schemaEquality = if (shouldMergeSchemas) {
-        shouldMergeSchemas == relation.shouldMergeSchemas
-      } else {
-        schema == relation.schema
-      }
-
-      paths.toSet == relation.paths.toSet &&
-        schemaEquality &&
-        maybeMetastoreSchema == relation.maybeMetastoreSchema &&
-        maybePartitionSpec == relation.maybePartitionSpec
-
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    if (shouldMergeSchemas) {
-      com.google.common.base.Objects.hashCode(
-        shouldMergeSchemas: java.lang.Boolean,
-        paths.toSet,
-        maybeMetastoreSchema,
-        maybePartitionSpec)
-    } else {
-      com.google.common.base.Objects.hashCode(
-        shouldMergeSchemas: java.lang.Boolean,
-        schema,
-        paths.toSet,
-        maybeMetastoreSchema,
-        maybePartitionSpec)
-    }
-  }
-
-  private[sql] def sparkContext = sqlContext.sparkContext
-
-  private class MetadataCache {
-    // `FileStatus` objects of all "_metadata" files.
-    private var metadataStatuses: Array[FileStatus] = _
-
-    // `FileStatus` objects of all "_common_metadata" files.
-    private var commonMetadataStatuses: Array[FileStatus] = _
-
-    // Parquet footer cache.
-    var footers: Map[FileStatus, Footer] = _
-
-    // `FileStatus` objects of all data files (Parquet part-files).
-    var dataStatuses: Array[FileStatus] = _
-
-    // Partition spec of this table, including names, data types, and values of each partition
-    // column, and paths of each partition.
-    var partitionSpec: PartitionSpec = _
-
-    // Schema of the actual Parquet files, without partition columns discovered from partition
-    // directory paths.
-    var parquetSchema: StructType = _
-
-    // Schema of the whole table, including partition columns.
-    var schema: StructType = _
-
-    // Indicates whether partition columns are also included in Parquet data file schema.  If not,
-    // we need to fill in partition column values into read rows when scanning the table.
-    var partitionKeysIncludedInParquetSchema: Boolean = _
-
-    def prepareMetadata(path: Path, schema: StructType, conf: Configuration): Unit = {
-      conf.set(
-        ParquetOutputFormat.COMPRESSION,
-        ParquetRelation
-          .shortParquetCompressionCodecNames
-          .getOrElse(
-            sqlContext.conf.parquetCompressionCodec.toUpperCase,
-            CompressionCodecName.UNCOMPRESSED).name())
-
-      ParquetRelation.enableLogForwarding()
-      ParquetTypesConverter.writeMetaData(schema.toAttributes, path, conf)
-    }
-
-    /**
-     * Refreshes `FileStatus`es, footers, partition spec, and table schema.
-     */
-    def refresh(): Unit = {
-      // Support either reading a collection of raw Parquet part-files, or a collection of folders
-      // containing Parquet files (e.g. partitioned Parquet table).
-      val baseStatuses = paths.distinct.map { p =>
-        val fs = FileSystem.get(URI.create(p), sparkContext.hadoopConfiguration)
-        val path = new Path(p)
-        val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-
-        if (!fs.exists(qualified) && maybeSchema.isDefined) {
-          fs.mkdirs(qualified)
-          prepareMetadata(qualified, maybeSchema.get, sparkContext.hadoopConfiguration)
-        }
-
-        fs.getFileStatus(qualified)
-      }.toArray
-      assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
-
-      // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
-      val leaves = baseStatuses.flatMap { f =>
-        val fs = FileSystem.get(f.getPath.toUri, sparkContext.hadoopConfiguration)
-        SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
-          isSummaryFile(f.getPath) ||
-            !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
-        }
-      }
-
-      dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
-      metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
-      commonMetadataStatuses =
-        leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
-
-      footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
-        val parquetMetadata = ParquetFileReader.readFooter(
-          sparkContext.hadoopConfiguration, f, ParquetMetadataConverter.NO_FILTER)
-        f -> new Footer(f.getPath, parquetMetadata)
-      }.seq.toMap
-
-      partitionSpec = maybePartitionSpec.getOrElse {
-        val partitionDirs = leaves
-          .filterNot(baseStatuses.contains)
-          .map(_.getPath.getParent)
-          .distinct
-
-        if (partitionDirs.nonEmpty) {
-          // Parses names and values of partition columns, and infer their data types.
-          PartitioningUtils.parsePartitions(partitionDirs, defaultPartitionName)
-        } else {
-          // No partition directories found, makes an empty specification
-          PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
-        }
-      }
-
-      // To get the schema. We first try to get the schema defined in maybeSchema.
-      // If maybeSchema is not defined, we will try to get the schema from existing parquet data
-      // (through readSchema). If data does not exist, we will try to get the schema defined in
-      // maybeMetastoreSchema (defined in the options of the data source).
-      // Finally, if we still could not get the schema. We throw an error.
-      parquetSchema =
-        maybeSchema
-          .orElse(readSchema())
-          .orElse(maybeMetastoreSchema)
-          .getOrElse(sys.error("Failed to get the schema."))
-
-      partitionKeysIncludedInParquetSchema =
-        isPartitioned &&
-          partitionColumns.forall(f => parquetSchema.fieldNames.contains(f.name))
-
-      schema = {
-        val fullRelationSchema = if (partitionKeysIncludedInParquetSchema) {
-          parquetSchema
-        } else {
-          StructType(parquetSchema.fields ++ partitionColumns.fields)
-        }
-
-        // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
-        // insensitivity issue and possible schema mismatch.
-        maybeMetastoreSchema
-          .map(ParquetRelation2.mergeMetastoreParquetSchema(_, fullRelationSchema))
-          .getOrElse(fullRelationSchema)
-      }
-    }
-
-    private def readSchema(): Option[StructType] = {
-      // Sees which file(s) we need to touch in order to figure out the schema.
-      val filesToTouch =
-      // Always tries the summary files first if users don't require a merged schema.  In this case,
-      // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
-      // groups information, and could be much smaller for large Parquet files with lots of row
-      // groups.
-      //
-      // NOTE: Metadata stored in the summary files are merged from all part-files.  However, for
-      // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
-      // how to merge them correctly if some key is associated with different values in different
-      // part-files.  When this happens, Parquet simply gives up generating the summary file.  This
-      // implies that if a summary file presents, then:
-      //
-      //   1. Either all part-files have exactly the same Spark SQL schema, or
-      //   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
-      //      their schemas may differ from each other).
-      //
-      // Here we tend to be pessimistic and take the second case into account.  Basically this means
-      // we can't trust the summary files if users require a merged schema, and must touch all part-
-      // files to do the merge.
-        if (shouldMergeSchemas) {
-          // Also includes summary files, 'cause there might be empty partition directories.
-          (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
-        } else {
-          // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
-          // don't have this.
-          commonMetadataStatuses.headOption
-            // Falls back to "_metadata"
-            .orElse(metadataStatuses.headOption)
-            // Summary file(s) not found, the Parquet file is either corrupted, or different part-
-            // files contain conflicting user defined metadata (two or more values are associated
-            // with a same key in different files).  In either case, we fall back to any of the
-            // first part-file, and just assume all schemas are consistent.
-            .orElse(dataStatuses.headOption)
-            .toSeq
-        }
-
-      ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
-    }
-  }
-
-  @transient private val metadataCache = new MetadataCache
-  metadataCache.refresh()
-
-  def partitionSpec: PartitionSpec = metadataCache.partitionSpec
-
-  def partitionColumns: StructType = metadataCache.partitionSpec.partitionColumns
-
-  def partitions: Seq[Partition] = metadataCache.partitionSpec.partitions
-
-  def isPartitioned: Boolean = partitionColumns.nonEmpty
-
-  private def partitionKeysIncludedInDataSchema = metadataCache.partitionKeysIncludedInParquetSchema
-
-  private def parquetSchema = metadataCache.parquetSchema
-
-  override def schema: StructType = metadataCache.schema
-
-  private def isSummaryFile(file: Path): Boolean = {
-    file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
-      file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
-  }
-
-  // Skip type conversion
-  override val needConversion: Boolean = false
-
-  // TODO Should calculate per scan size
-  // It's common that a query only scans a fraction of a large Parquet file.  Returning size of the
-  // whole Parquet file disables some optimizations in this case (e.g. broadcast join).
-  override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
-
-  // This is mostly a hack so that we can use the existing parquet filter code.
-  override def buildScan(output: Seq[Attribute], predicates: Seq[Expression]): RDD[Row] = {
-    val job = new Job(sparkContext.hadoopConfiguration)
-    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
-    val jobConf: Configuration = ContextUtil.getConfiguration(job)
-
-    val selectedPartitions = prunePartitions(predicates, partitions)
-    val selectedFiles = if (isPartitioned) {
-      selectedPartitions.flatMap { p =>
-        metadataCache.dataStatuses.filter(_.getPath.getParent.toString == p.path)
-      }
-    } else {
-      metadataCache.dataStatuses.toSeq
-    }
-    val selectedFooters = selectedFiles.map(metadataCache.footers)
-
-    // FileInputFormat cannot handle empty lists.
-    if (selectedFiles.nonEmpty) {
-      // In order to encode the authority of a Path containning special characters such as /,
-      // we need to use the string retruned by the URI of the path to create a new Path.
-      val selectedPaths = selectedFiles.map(status => new Path(status.getPath.toUri.toString))
-      FileInputFormat.setInputPaths(job, selectedPaths: _*)
-    }
-
-    // Try to push down filters when filter push-down is enabled.
-    if (sqlContext.conf.parquetFilterPushDown) {
-      val partitionColNames = partitionColumns.map(_.name).toSet
-      predicates
-        // Don't push down predicates which reference partition columns
-        .filter { pred =>
-          val referencedColNames = pred.references.map(_.name).toSet
-          referencedColNames.intersect(partitionColNames).isEmpty
-        }
-        // Collects all converted Parquet filter predicates. Notice that not all predicates can be
-        // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-        // is used here.
-        .flatMap(ParquetFilters.createFilter)
-        .reduceOption(FilterApi.and)
-        .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
-    }
-
-    if (isPartitioned) {
-      logInfo {
-        val percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100
-        s"Reading $percentRead% of partitions"
-      }
-    }
-
-    val requiredColumns = output.map(_.name)
-    val requestedSchema = StructType(requiredColumns.map(schema(_)))
-
-    // Store both requested and original schema in `Configuration`
-    jobConf.set(
-      RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-      convertToString(requestedSchema.toAttributes))
-    jobConf.set(
-      RowWriteSupport.SPARK_ROW_SCHEMA,
-      convertToString(schema.toAttributes))
-
-    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
-    val useCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
-    jobConf.set(SQLConf.PARQUET_CACHE_METADATA, useCache.toString)
-
-    val baseRDD =
-      new NewHadoopRDD(
-          sparkContext,
-          classOf[FilteringParquetRowInputFormat],
-          classOf[Void],
-          classOf[Row],
-          jobConf) {
-        val cacheMetadata = useCache
-
-        @transient
-        val cachedStatus = selectedFiles.map { st =>
-          // In order to encode the authority of a Path containning special characters such as /,
-          // we need to use the string retruned by the URI of the path to create a new Path.
-          val newPath = new Path(st.getPath.toUri.toString)
-
-          new FileStatus(
-            st.getLen,
-            st.isDir,
-            st.getReplication,
-            st.getBlockSize,
-            st.getModificationTime,
-            st.getAccessTime,
-            st.getPermission,
-            st.getOwner,
-            st.getGroup,
-            newPath)
-        }
-
-        @transient
-        val cachedFooters = selectedFooters.map { f =>
-          // In order to encode the authority of a Path containning special characters such as /,
-          // we need to use the string retruned by the URI of the path to create a new Path.
-          new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
-        }
-
-
-        // Overridden so we can inject our own cached files statuses.
-        override def getPartitions: Array[SparkPartition] = {
-          val inputFormat = if (cacheMetadata) {
-            new FilteringParquetRowInputFormat {
-              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
-
-              override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
-            }
-          } else {
-            new FilteringParquetRowInputFormat
-          }
-
-          val jobContext = newJobContext(getConf, jobId)
-          val rawSplits = inputFormat.getSplits(jobContext)
-
-          Array.tabulate[SparkPartition](rawSplits.size) { i =>
-            new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
-          }
-        }
-      }
-
-    // The ordinals for partition keys in the result row, if requested.
-    val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
-      case (name, index) => index -> requiredColumns.indexOf(name)
-    }.toMap.filter {
-      case (_, index) => index >= 0
-    }
-
-    // When the data does not include the key and the key is requested then we must fill it in
-    // based on information from the input split.
-    if (!partitionKeysIncludedInDataSchema && partitionKeyLocations.nonEmpty) {
-      // This check is based on CatalystConverter.createRootConverter.
-      val primitiveRow =
-        requestedSchema.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
-
-      baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
-        val partValues = selectedPartitions.collectFirst {
-          case p if split.getPath.getParent.toString == p.path =>
-            CatalystTypeConverters.convertToCatalyst(p.values).asInstanceOf[Row]
-        }.get
-
-        val requiredPartOrdinal = partitionKeyLocations.keys.toSeq
-
-        if (primitiveRow) {
-          iterator.map { pair =>
-            // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
-            val row = pair._2.asInstanceOf[SpecificMutableRow]
-            var i = 0
-            while (i < requiredPartOrdinal.size) {
-              // TODO Avoids boxing cost here!
-              val partOrdinal = requiredPartOrdinal(i)
-              row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
-              i += 1
-            }
-            row
-          }
-        } else {
-          // Create a mutable row since we need to fill in values from partition columns.
-          val mutableRow = new GenericMutableRow(requestedSchema.size)
-          iterator.map { pair =>
-            // We are using CatalystGroupConverter and it returns a GenericRow.
-            // Since GenericRow is not mutable, we just cast it to a Row.
-            val row = pair._2.asInstanceOf[Row]
-            var i = 0
-            while (i < row.size) {
-              // TODO Avoids boxing cost here!
-              mutableRow(i) = row(i)
-              i += 1
-            }
-
-            i = 0
-            while (i < requiredPartOrdinal.size) {
-              // TODO Avoids boxing cost here!
-              val partOrdinal = requiredPartOrdinal(i)
-              mutableRow.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
-              i += 1
-            }
-            mutableRow
-          }
-        }
-      }
-    } else {
-      baseRDD.map(_._2)
-    }
-  }
-
-  private def prunePartitions(
-      predicates: Seq[Expression],
-      partitions: Seq[Partition]): Seq[Partition] = {
-    val partitionColumnNames = partitionColumns.map(_.name).toSet
-    val partitionPruningPredicates = predicates.filter {
-      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
-    }
-
-    val rawPredicate =
-      partitionPruningPredicates.reduceOption(expressions.And).getOrElse(Literal(true))
-    val boundPredicate = InterpretedPredicate.create(rawPredicate transform {
-      case a: AttributeReference =>
-        val index = partitionColumns.indexWhere(a.name == _.name)
-        BoundReference(index, partitionColumns(index).dataType, nullable = true)
-    })
-
-    if (isPartitioned && partitionPruningPredicates.nonEmpty) {
-      partitions.filter(p => boundPredicate(p.values))
-    } else {
-      partitions
-    }
-  }
-
-  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
-    assert(paths.size == 1, s"Can't write to multiple destinations: ${paths.mkString(",")}")
-
-    // TODO: currently we do not check whether the "schema"s are compatible
-    // That means if one first creates a table and then INSERTs data with
-    // and incompatible schema the execution will fail. It would be nice
-    // to catch this early one, maybe having the planner validate the schema
-    // before calling execute().
-
-    val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
-    val writeSupport =
-      if (parquetSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
-        log.debug("Initializing MutableRowWriteSupport")
-        classOf[MutableRowWriteSupport]
-      } else {
-        classOf[RowWriteSupport]
-      }
-
-    ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
-
-    val conf = ContextUtil.getConfiguration(job)
-    RowWriteSupport.setSchema(data.schema.toAttributes, conf)
-
-    val destinationPath = new Path(paths.head)
-
-    if (overwrite) {
-      val fs = destinationPath.getFileSystem(conf)
-      if (fs.exists(destinationPath)) {
-        var success: Boolean = false
-        try {
-          success = fs.delete(destinationPath, true)
-        } catch {
-          case e: IOException =>
-            throw new IOException(
-              s"Unable to clear output directory ${destinationPath.toString} prior" +
-                s" to writing to Parquet table:\n${e.toString}")
-        }
-        if (!success) {
-          throw new IOException(
-            s"Unable to clear output directory ${destinationPath.toString} prior" +
-              s" to writing to Parquet table.")
-        }
-      }
-    }
-
-    job.setOutputKeyClass(classOf[Void])
-    job.setOutputValueClass(classOf[Row])
-    FileOutputFormat.setOutputPath(job, destinationPath)
-
-    val wrappedConf = new SerializableWritable(job.getConfiguration)
-    val jobTrackerId = new SimpleDateFormat("yyyyMMddHHmm").format(new Date())
-    val stageId = sqlContext.sparkContext.newRddId()
-
-    val taskIdOffset = if (overwrite) {
-      1
-    } else {
-      FileSystemHelper.findMaxTaskId(
-        FileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
-    }
-
-    def writeShard(context: TaskContext, iterator: Iterator[Row]): Unit = {
-      /* "reduce task" <split #> <attempt # = spark task #> */
-      val attemptId = newTaskAttemptID(
-        jobTrackerId, stageId, isMap = false, context.partitionId(), context.attemptNumber())
-      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
-      val format = new AppendingParquetOutputFormat(taskIdOffset)
-      val committer = format.getOutputCommitter(hadoopContext)
-      committer.setupTask(hadoopContext)
-      val writer = format.getRecordWriter(hadoopContext)
-      try {
-        while (iterator.hasNext) {
-          val row = iterator.next()
-          writer.write(null, row)
-        }
-      } finally {
-        writer.close(hadoopContext)
-      }
-
-      SparkHadoopMapRedUtil.commitTask(committer, hadoopContext, context)
-    }
-    val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
-    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
-     * however we're only going to use this local OutputCommitter for
-     * setupJob/commitJob, so we just use a dummy "map" task.
-     */
-    val jobAttemptId = newTaskAttemptID(jobTrackerId, stageId, isMap = true, 0, 0)
-    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
-    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
-
-    jobCommitter.setupJob(jobTaskContext)
-    sqlContext.sparkContext.runJob(data.queryExecution.executedPlan.execute(), writeShard _)
-    jobCommitter.commitJob(jobTaskContext)
-
-    metadataCache.refresh()
-  }
-}
-
-private[sql] object ParquetRelation2 extends Logging {
-  // Whether we should merge schemas collected from all Parquet part-files.
-  val MERGE_SCHEMA = "mergeSchema"
-
-  // Default partition name to use when the partition column value is null or empty string.
-  val DEFAULT_PARTITION_NAME = "partition.defaultName"
-
-  // Hive Metastore schema, used when converting Metastore Parquet tables.  This option is only used
-  // internally.
-  private[sql] val METASTORE_SCHEMA = "metastoreSchema"
-
-  private[parquet] def readSchema(
-      footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
-    footers.map { footer =>
-      val metadata = footer.getParquetMetadata.getFileMetaData
-      val parquetSchema = metadata.getSchema
-      val maybeSparkSchema = metadata
-        .getKeyValueMetaData
-        .toMap
-        .get(RowReadSupport.SPARK_METADATA_KEY)
-        .flatMap { serializedSchema =>
-          // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
-          // whatever is available.
-          Try(DataType.fromJson(serializedSchema))
-            .recover { case _: Throwable =>
-              logInfo(
-                s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
-                  "falling back to the deprecated DataType.fromCaseClassString parser.")
-              DataType.fromCaseClassString(serializedSchema)
-            }
-            .recover { case cause: Throwable =>
-              logWarning(
-                s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
-                   |\t$serializedSchema
-                 """.stripMargin,
-                cause)
-            }
-            .map(_.asInstanceOf[StructType])
-            .toOption
-        }
-
-      maybeSparkSchema.getOrElse {
-        // Falls back to Parquet schema if Spark SQL schema is absent.
-        StructType.fromAttributes(
-          // TODO Really no need to use `Attribute` here, we only need to know the data type.
-          convertToAttributes(
-            parquetSchema,
-            sqlContext.conf.isParquetBinaryAsString,
-            sqlContext.conf.isParquetINT96AsTimestamp))
-      }
-    }.reduceOption { (left, right) =>
-      try left.merge(right) catch { case e: Throwable =>
-        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
-      }
-    }
-  }
-
-  /**
-   * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
-   * schema and Parquet schema.
-   *
-   * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
-   * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
-   * distinguish binary and string).  This method generates a correct schema by merging Metastore
-   * schema data types and Parquet schema field names.
-   */
-  private[parquet] def mergeMetastoreParquetSchema(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    def schemaConflictMessage: String =
-      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
-         |${metastoreSchema.prettyJson}
-         |
-         |Parquet schema:
-         |${parquetSchema.prettyJson}
-       """.stripMargin
-
-    val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
-
-    assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
-
-    val ordinalMap = metastoreSchema.zipWithIndex.map {
-      case (field, index) => field.name.toLowerCase -> index
-    }.toMap
-    val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
-      ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
-
-    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
-      // Uses Parquet field names but retains Metastore data types.
-      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
-        mSchema.copy(name = pSchema.name)
-      case _ =>
-        throw new SparkException(schemaConflictMessage)
-    })
-  }
-
-  /**
-   * Returns the original schema from the Parquet file with any missing nullable fields from the
-   * Hive Metastore schema merged in.
-   *
-   * When constructing a DataFrame from a collection of structured data, the resulting object has
-   * a schema corresponding to the union of the fields present in each element of the collection.
-   * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
-   * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
-   * contain a particular nullable field in its schema despite that field being present in the
-   * table schema obtained from the Hive Metastore. This method returns a schema representing the
-   * Parquet file schema along with any additional nullable fields from the Metastore schema
-   * merged in.
-   */
-  private[parquet] def mergeMissingNullableFields(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
-    val missingFields = metastoreSchema
-      .map(_.name.toLowerCase)
-      .diff(parquetSchema.map(_.name.toLowerCase))
-      .map(fieldMap(_))
-      .filter(_.nullable)
-    StructType(parquetSchema ++ missingFields)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index a294297..7879328 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -293,9 +293,18 @@ private[sql] abstract class BaseWriterContainer(
   }
 
   private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
-    outputFormatClass.newInstance().getOutputCommitter(context)
+    val committerClass = context.getConfiguration.getClass(
+      "mapred.output.committer.class", null, classOf[OutputCommitter])
+
+    Option(committerClass).map { clazz =>
+      val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+      ctor.newInstance(new Path(outputPath), context)
+    }.getOrElse {
+      outputFormatClass.newInstance().getOutputCommitter(context)
+    }
   }
 
+
   private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
     this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
     this.taskId = new TaskID(this.jobId, true, splitId)
@@ -345,6 +354,7 @@ private[sql] class DefaultWriterContainer(
 
   override protected def initWriters(): Unit = {
     writer = outputWriterClass.newInstance()
+    taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
     writer.init(getWorkPath, dataSchema, taskAttemptContext)
   }
 
@@ -384,11 +394,14 @@ private[sql] class DynamicPartitionWriterContainer(
         DynamicPartitionWriterContainer.escapePathName(string)
       }
       s"/$col=$valueString"
-    }.mkString
+    }.mkString.stripPrefix(Path.SEPARATOR)
 
     outputWriters.getOrElseUpdate(partitionPath, {
-      val path = new Path(getWorkPath, partitionPath.stripPrefix(Path.SEPARATOR))
+      val path = new Path(getWorkPath, partitionPath)
       val writer = outputWriterClass.newInstance()
+      taskAttemptContext.getConfiguration.set(
+        "spark.sql.sources.output.path",
+        new Path(outputPath, partitionPath).toString)
       writer.init(path.toString, dataSchema, taskAttemptContext)
       writer
     })


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org