You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/10/09 01:18:40 UTC

spark git commit: [SPARK-8848] [SQL] Refactors Parquet write path to follow parquet-format

Repository: spark
Updated Branches:
  refs/heads/master 2816c89b6 -> 02149ff08


[SPARK-8848] [SQL] Refactors Parquet write path to follow parquet-format

This PR refactors Parquet write path to follow parquet-format spec.  It's a successor of PR #7679, but with less non-essential changes.

Major changes include:

1.  Replaces `RowWriteSupport` and `MutableRowWriteSupport` with `CatalystWriteSupport`

    - Writes Parquet data using standard layout defined in parquet-format

      Specifically, we are now writing ...

      - ... arrays and maps in standard 3-level structure with proper annotations and field names
      - ... decimals as `INT32` and `INT64` whenever possible, and taking `FIXED_LEN_BYTE_ARRAY` as the final fallback

    - Supports legacy mode which is compatible with Spark 1.4 and prior versions

      The legacy mode is by default off, and can be turned on by flipping SQL option `spark.sql.parquet.writeLegacyFormat` to `true`.

    - Eliminates per value data type dispatching costs via prebuilt composed writer functions

1.  Cleans up the last pieces of old Parquet support code

As pointed out by rxin previously, we probably want to rename all those `Catalyst*` Parquet classes to `Parquet*` for clarity.  But I'd like to do this in a follow-up PR to minimize code review noises in this one.

Author: Cheng Lian <li...@databricks.com>

Closes #8988 from liancheng/spark-8848/standard-parquet-write-path.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02149ff0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02149ff0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02149ff0

Branch: refs/heads/master
Commit: 02149ff08eed3745086589a047adbce9a580389f
Parents: 2816c89
Author: Cheng Lian <li...@databricks.com>
Authored: Thu Oct 8 16:18:35 2015 -0700
Committer: Cheng Lian <li...@databricks.com>
Committed: Thu Oct 8 16:18:35 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/types/Decimal.scala    |   4 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    |   5 +-
 .../parquet/CatalystReadSupport.scala           |  35 +-
 .../parquet/CatalystRowConverter.scala          |  59 +--
 .../parquet/CatalystSchemaConverter.scala       |  46 +-
 .../parquet/CatalystWriteSupport.scala          | 436 +++++++++++++++++++
 .../parquet/DirectParquetOutputCommitter.scala  |   2 +-
 .../datasources/parquet/ParquetConverter.scala  |  39 --
 .../datasources/parquet/ParquetFilters.scala    |  36 --
 .../datasources/parquet/ParquetRelation.scala   |  42 +-
 .../parquet/ParquetTableSupport.scala           | 321 --------------
 .../parquet/ParquetTypesConverter.scala         | 160 -------
 .../apache/spark/sql/UserDefinedTypeSuite.scala |  32 +-
 .../datasources/parquet/ParquetIOSuite.scala    |  63 +--
 .../datasources/parquet/ParquetQuerySuite.scala |  46 +-
 .../parquet/ParquetSchemaSuite.scala            |   4 +-
 .../datasources/parquet/ParquetTest.scala       |  44 +-
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |  28 +-
 18 files changed, 709 insertions(+), 693 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 909b8e3..c11dab3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -108,7 +108,9 @@ final class Decimal extends Ordered[Decimal] with Serializable {
    */
   def set(decimal: BigDecimal, precision: Int, scale: Int): Decimal = {
     this.decimalVal = decimal.setScale(scale, ROUNDING_MODE)
-    require(decimalVal.precision <= precision, "Overflowed precision")
+    require(
+      decimalVal.precision <= precision,
+      s"Decimal precision ${decimalVal.precision} exceeds max precision $precision")
     this.longVal = 0L
     this._precision = precision
     this._scale = scale

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 8f0f891..47397c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -292,10 +292,9 @@ private[spark] object SQLConf {
 
   val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
     key = "spark.sql.parquet.writeLegacyFormat",
-    defaultValue = Some(true),
+    defaultValue = Some(false),
     doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
-      "Spark SQL schema and vice versa.",
-    isPublic = false)
+      "Spark SQL schema and vice versa.")
 
   val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
     key = "spark.sql.parquet.output.committer.class",

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 5325698..a958373 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -95,7 +95,9 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
        """.stripMargin
     }
 
-    new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
+    new CatalystRecordMaterializer(
+      parquetRequestedSchema,
+      CatalystReadSupport.expandUDT(catalystRequestedSchema))
   }
 }
 
@@ -110,7 +112,10 @@ private[parquet] object CatalystReadSupport {
    */
   def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
     val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
-    Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
+    Types
+      .buildMessage()
+      .addFields(clippedParquetFields: _*)
+      .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
   }
 
   private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
@@ -271,4 +276,30 @@ private[parquet] object CatalystReadSupport {
         .getOrElse(toParquet.convertField(f))
     }
   }
+
+  def expandUDT(schema: StructType): StructType = {
+    def expand(dataType: DataType): DataType = {
+      dataType match {
+        case t: ArrayType =>
+          t.copy(elementType = expand(t.elementType))
+
+        case t: MapType =>
+          t.copy(
+            keyType = expand(t.keyType),
+            valueType = expand(t.valueType))
+
+        case t: StructType =>
+          val expandedFields = t.fields.map(f => f.copy(dataType = expand(f.dataType)))
+          t.copy(fields = expandedFields)
+
+        case t: UserDefinedType[_] =>
+          t.sqlType
+
+        case t =>
+          t
+      }
+    }
+
+    expand(schema).asInstanceOf[StructType]
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 050d361..247d353 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -27,7 +27,6 @@ import org.apache.parquet.column.Dictionary
 import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
 import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
-import org.apache.parquet.schema.Type.Repetition
 import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
 
 import org.apache.spark.Logging
@@ -114,7 +113,8 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
  * any "parent" container.
  *
  * @param parquetType Parquet schema of Parquet records
- * @param catalystType Spark SQL schema that corresponds to the Parquet record type
+ * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
+ *        types should have been expanded.
  * @param updater An updater which propagates converted field values to the parent container
  */
 private[parquet] class CatalystRowConverter(
@@ -133,6 +133,12 @@ private[parquet] class CatalystRowConverter(
        |${catalystType.prettyJson}
      """.stripMargin)
 
+  assert(
+    !catalystType.existsRecursively(_.isInstanceOf[UserDefinedType[_]]),
+    s"""User-defined types in Catalyst schema should have already been expanded:
+       |${catalystType.prettyJson}
+     """.stripMargin)
+
   logDebug(
     s"""Building row converter for the following schema:
        |
@@ -268,13 +274,6 @@ private[parquet] class CatalystRowConverter(
           override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
         })
 
-      case t: UserDefinedType[_] =>
-        val catalystTypeForUDT = t.sqlType
-        val nullable = parquetType.isRepetition(Repetition.OPTIONAL)
-        val field = StructField("udt", catalystTypeForUDT, nullable)
-        val parquetTypeForUDT = new CatalystSchemaConverter().convertField(field)
-        newConverter(parquetTypeForUDT, catalystTypeForUDT, updater)
-
       case _ =>
         throw new RuntimeException(
           s"Unable to create Parquet converter for data type ${catalystType.json}")
@@ -340,30 +339,36 @@ private[parquet] class CatalystRowConverter(
       val scale = decimalType.scale
 
       if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) {
-        // Constructs a `Decimal` with an unscaled `Long` value if possible.  The underlying
-        // `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we are using
-        // `Binary.toByteBuffer.array()` to steal the underlying byte array without copying it.
-        val buffer = value.toByteBuffer
-        val bytes = buffer.array()
-        val start = buffer.position()
-        val end = buffer.limit()
-
-        var unscaled = 0L
-        var i = start
-
-        while (i < end) {
-          unscaled = (unscaled << 8) | (bytes(i) & 0xff)
-          i += 1
-        }
-
-        val bits = 8 * (end - start)
-        unscaled = (unscaled << (64 - bits)) >> (64 - bits)
+        // Constructs a `Decimal` with an unscaled `Long` value if possible.
+        val unscaled = binaryToUnscaledLong(value)
         Decimal(unscaled, precision, scale)
       } else {
         // Otherwise, resorts to an unscaled `BigInteger` instead.
         Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale)
       }
     }
+
+    private def binaryToUnscaledLong(binary: Binary): Long = {
+      // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here
+      // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without
+      // copying it.
+      val buffer = binary.toByteBuffer
+      val bytes = buffer.array()
+      val start = buffer.position()
+      val end = buffer.limit()
+
+      var unscaled = 0L
+      var i = start
+
+      while (i < end) {
+        unscaled = (unscaled << 8) | (bytes(i) & 0xff)
+        i += 1
+      }
+
+      val bits = 8 * (end - start)
+      unscaled = (unscaled << (64 - bits)) >> (64 - bits)
+      unscaled
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 6904fc7..7f3394c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -121,7 +121,7 @@ private[parquet] class CatalystSchemaConverter(
       val precision = field.getDecimalMetadata.getPrecision
       val scale = field.getDecimalMetadata.getScale
 
-      CatalystSchemaConverter.analysisRequire(
+      CatalystSchemaConverter.checkConversionRequirement(
         maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
         s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)")
 
@@ -155,7 +155,7 @@ private[parquet] class CatalystSchemaConverter(
         }
 
       case INT96 =>
-        CatalystSchemaConverter.analysisRequire(
+        CatalystSchemaConverter.checkConversionRequirement(
           assumeInt96IsTimestamp,
           "INT96 is not supported unless it's interpreted as timestamp. " +
             s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.")
@@ -197,11 +197,11 @@ private[parquet] class CatalystSchemaConverter(
       //
       // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
       case LIST =>
-        CatalystSchemaConverter.analysisRequire(
+        CatalystSchemaConverter.checkConversionRequirement(
           field.getFieldCount == 1, s"Invalid list type $field")
 
         val repeatedType = field.getType(0)
-        CatalystSchemaConverter.analysisRequire(
+        CatalystSchemaConverter.checkConversionRequirement(
           repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
 
         if (isElementType(repeatedType, field.getName)) {
@@ -217,17 +217,17 @@ private[parquet] class CatalystSchemaConverter(
       // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
       // scalastyle:on
       case MAP | MAP_KEY_VALUE =>
-        CatalystSchemaConverter.analysisRequire(
+        CatalystSchemaConverter.checkConversionRequirement(
           field.getFieldCount == 1 && !field.getType(0).isPrimitive,
           s"Invalid map type: $field")
 
         val keyValueType = field.getType(0).asGroupType()
-        CatalystSchemaConverter.analysisRequire(
+        CatalystSchemaConverter.checkConversionRequirement(
           keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2,
           s"Invalid map type: $field")
 
         val keyType = keyValueType.getType(0)
-        CatalystSchemaConverter.analysisRequire(
+        CatalystSchemaConverter.checkConversionRequirement(
           keyType.isPrimitive,
           s"Map key type is expected to be a primitive type, but found: $keyType")
 
@@ -299,7 +299,10 @@ private[parquet] class CatalystSchemaConverter(
    * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
    */
   def convert(catalystSchema: StructType): MessageType = {
-    Types.buildMessage().addFields(catalystSchema.map(convertField): _*).named("root")
+    Types
+      .buildMessage()
+      .addFields(catalystSchema.map(convertField): _*)
+      .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
   }
 
   /**
@@ -347,10 +350,10 @@ private[parquet] class CatalystSchemaConverter(
       // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec.
       //
       // As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond
-      // timestamp in Impala for some historical reasons, it's not recommended to be used for any
-      // other types and will probably be deprecated in future Parquet format spec.  That's the
-      // reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and `TIMESTAMP_MICROS` which
-      // are both logical types annotating `INT64`.
+      // timestamp in Impala for some historical reasons.  It's not recommended to be used for any
+      // other types and will probably be deprecated in some future version of parquet-format spec.
+      // That's the reason why parquet-format spec only defines `TIMESTAMP_MILLIS` and
+      // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
       //
       // Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive.  Starting
       // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
@@ -361,7 +364,7 @@ private[parquet] class CatalystSchemaConverter(
       // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
       // hasn't implemented `TIMESTAMP_MICROS` yet.
       //
-      // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
+      // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
       case TimestampType =>
         Types.primitive(INT96, repetition).named(field.name)
 
@@ -523,11 +526,12 @@ private[parquet] class CatalystSchemaConverter(
   }
 }
 
-
 private[parquet] object CatalystSchemaConverter {
+  val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
+
   def checkFieldName(name: String): Unit = {
     // ,;{}()\n\t= and space are special characters in Parquet schema
-    analysisRequire(
+    checkConversionRequirement(
       !name.matches(".*[ ,;{}()\n\t=].*"),
       s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
          |Please use alias to rename it.
@@ -539,7 +543,7 @@ private[parquet] object CatalystSchemaConverter {
     schema
   }
 
-  def analysisRequire(f: => Boolean, message: String): Unit = {
+  def checkConversionRequirement(f: => Boolean, message: String): Unit = {
     if (!f) {
       throw new AnalysisException(message)
     }
@@ -553,16 +557,8 @@ private[parquet] object CatalystSchemaConverter {
     numBytes
   }
 
-  private val MIN_BYTES_FOR_PRECISION = Array.tabulate[Int](39)(computeMinBytesForPrecision)
-
   // Returns the minimum number of bytes needed to store a decimal with a given `precision`.
-  def minBytesForPrecision(precision : Int) : Int = {
-    if (precision < MIN_BYTES_FOR_PRECISION.length) {
-      MIN_BYTES_FOR_PRECISION(precision)
-    } else {
-      computeMinBytesForPrecision(precision)
-    }
-  }
+  val minBytesForPrecision = Array.tabulate[Int](39)(computeMinBytesForPrecision)
 
   val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) /* 9 */
 

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
new file mode 100644
index 0000000..483363d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.nio.{ByteBuffer, ByteOrder}
+import java.util
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.column.ParquetProperties
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
+import org.apache.parquet.io.api.{Binary, RecordConsumer}
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.{MAX_PRECISION_FOR_INT32, MAX_PRECISION_FOR_INT64, minBytesForPrecision}
+import org.apache.spark.sql.types._
+
+/**
+ * A Parquet [[WriteSupport]] implementation that writes Catalyst [[InternalRow]]s as Parquet
+ * messages.  This class can write Parquet data in two modes:
+ *
+ *  - Standard mode: Parquet data are written in standard format defined in parquet-format spec.
+ *  - Legacy mode: Parquet data are written in legacy format compatible with Spark 1.4 and prior.
+ *
+ * This behavior can be controlled by SQL option `spark.sql.parquet.writeLegacyFormat`.  The value
+ * of this option is propagated to this class by the `init()` method and its Hadoop configuration
+ * argument.
+ */
+private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] with Logging {
+  // A `ValueWriter` is responsible for writing a field of an `InternalRow` to the record consumer.
+  // Here we are using `SpecializedGetters` rather than `InternalRow` so that we can directly access
+  // data in `ArrayData` without the help of `SpecificMutableRow`.
+  private type ValueWriter = (SpecializedGetters, Int) => Unit
+
+  // Schema of the `InternalRow`s to be written
+  private var schema: StructType = _
+
+  // `ValueWriter`s for all fields of the schema
+  private var rootFieldWriters: Seq[ValueWriter] = _
+
+  // The Parquet `RecordConsumer` to which all `InternalRow`s are written
+  private var recordConsumer: RecordConsumer = _
+
+  // Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions
+  private var writeLegacyParquetFormat: Boolean = _
+
+  // Reusable byte array used to write timestamps as Parquet INT96 values
+  private val timestampBuffer = new Array[Byte](12)
+
+  // Reusable byte array used to write decimal values
+  private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))
+
+  override def init(configuration: Configuration): WriteContext = {
+    val schemaString = configuration.get(CatalystWriteSupport.SPARK_ROW_SCHEMA)
+    this.schema = StructType.fromString(schemaString)
+    this.writeLegacyParquetFormat = {
+      // `SQLConf.PARQUET_WRITE_LEGACY_FORMAT` should always be explicitly set in ParquetRelation
+      assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != null)
+      configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
+    }
+    this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
+
+    val messageType = new CatalystSchemaConverter(configuration).convert(schema)
+    val metadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
+
+    logInfo(
+      s"""Initialized Parquet WriteSupport with Catalyst schema:
+         |${schema.prettyJson}
+         |and corresponding Parquet message type:
+         |$messageType
+       """.stripMargin)
+
+    new WriteContext(messageType, metadata)
+  }
+
+  override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
+    this.recordConsumer = recordConsumer
+  }
+
+  override def write(row: InternalRow): Unit = {
+    consumeMessage {
+      writeFields(row, schema, rootFieldWriters)
+    }
+  }
+
+  private def writeFields(
+      row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = {
+    var i = 0
+    while (i < row.numFields) {
+      if (!row.isNullAt(i)) {
+        consumeField(schema(i).name, i) {
+          fieldWriters(i).apply(row, i)
+        }
+      }
+      i += 1
+    }
+  }
+
+  private def makeWriter(dataType: DataType): ValueWriter = {
+    dataType match {
+      case BooleanType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addBoolean(row.getBoolean(ordinal))
+
+      case ByteType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addInteger(row.getByte(ordinal))
+
+      case ShortType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addInteger(row.getShort(ordinal))
+
+      case IntegerType | DateType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addInteger(row.getInt(ordinal))
+
+      case LongType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addLong(row.getLong(ordinal))
+
+      case FloatType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addFloat(row.getFloat(ordinal))
+
+      case DoubleType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addDouble(row.getDouble(ordinal))
+
+      case StringType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addBinary(Binary.fromByteArray(row.getUTF8String(ordinal).getBytes))
+
+      case TimestampType =>
+        (row: SpecializedGetters, ordinal: Int) => {
+          // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it
+          // Currently we only support timestamps stored as INT96, which is compatible with Hive
+          // and Impala.  However, INT96 is to be deprecated.  We plan to support `TIMESTAMP_MICROS`
+          // defined in the parquet-format spec.  But up until writing, the most recent parquet-mr
+          // version (1.8.1) hasn't implemented it yet.
+
+          // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
+          // precision.  Nanosecond parts of timestamp values read from INT96 are simply stripped.
+          val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
+          val buf = ByteBuffer.wrap(timestampBuffer)
+          buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
+          recordConsumer.addBinary(Binary.fromByteArray(timestampBuffer))
+        }
+
+      case BinaryType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal)))
+
+      case DecimalType.Fixed(precision, scale) =>
+        makeDecimalWriter(precision, scale)
+
+      case t: StructType =>
+        val fieldWriters = t.map(_.dataType).map(makeWriter)
+        (row: SpecializedGetters, ordinal: Int) =>
+          consumeGroup {
+            writeFields(row.getStruct(ordinal, t.length), t, fieldWriters)
+          }
+
+      case t: ArrayType => makeArrayWriter(t)
+
+      case t: MapType => makeMapWriter(t)
+
+      case t: UserDefinedType[_] => makeWriter(t.sqlType)
+
+      // TODO Adds IntervalType support
+      case _ => sys.error(s"Unsupported data type $dataType.")
+    }
+  }
+
+  private def makeDecimalWriter(precision: Int, scale: Int): ValueWriter = {
+    assert(
+      precision <= DecimalType.MAX_PRECISION,
+      s"Decimal precision $precision exceeds max precision ${DecimalType.MAX_PRECISION}")
+
+    val numBytes = minBytesForPrecision(precision)
+
+    val int32Writer =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong
+        recordConsumer.addInteger(unscaledLong.toInt)
+      }
+
+    val int64Writer =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong
+        recordConsumer.addLong(unscaledLong)
+      }
+
+    val binaryWriterUsingUnscaledLong =
+      (row: SpecializedGetters, ordinal: Int) => {
+        // When the precision is low enough (<= 18) to squeeze the decimal value into a `Long`, we
+        // can build a fixed-length byte array with length `numBytes` using the unscaled `Long`
+        // value and the `decimalBuffer` for better performance.
+        val unscaled = row.getDecimal(ordinal, precision, scale).toUnscaledLong
+        var i = 0
+        var shift = 8 * (numBytes - 1)
+
+        while (i < numBytes) {
+          decimalBuffer(i) = (unscaled >> shift).toByte
+          i += 1
+          shift -= 8
+        }
+
+        recordConsumer.addBinary(Binary.fromByteArray(decimalBuffer, 0, numBytes))
+      }
+
+    val binaryWriterUsingUnscaledBytes =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val decimal = row.getDecimal(ordinal, precision, scale)
+        val bytes = decimal.toJavaBigDecimal.unscaledValue().toByteArray
+        val fixedLengthBytes = if (bytes.length == numBytes) {
+          // If the length of the underlying byte array of the unscaled `BigInteger` happens to be
+          // `numBytes`, just reuse it, so that we don't bother copying it to `decimalBuffer`.
+          bytes
+        } else {
+          // Otherwise, the length must be less than `numBytes`.  In this case we copy contents of
+          // the underlying bytes with padding sign bytes to `decimalBuffer` to form the result
+          // fixed-length byte array.
+          val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
+          util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte)
+          System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length)
+          decimalBuffer
+        }
+
+        recordConsumer.addBinary(Binary.fromByteArray(fixedLengthBytes, 0, numBytes))
+      }
+
+    writeLegacyParquetFormat match {
+      // Standard mode, 1 <= precision <= 9, writes as INT32
+      case false if precision <= MAX_PRECISION_FOR_INT32 => int32Writer
+
+      // Standard mode, 10 <= precision <= 18, writes as INT64
+      case false if precision <= MAX_PRECISION_FOR_INT64 => int64Writer
+
+      // Legacy mode, 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
+      case true if precision <= MAX_PRECISION_FOR_INT64 => binaryWriterUsingUnscaledLong
+
+      // Either standard or legacy mode, 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY
+      case _ => binaryWriterUsingUnscaledBytes
+    }
+  }
+
+  def makeArrayWriter(arrayType: ArrayType): ValueWriter = {
+    val elementWriter = makeWriter(arrayType.elementType)
+
+    def threeLevelArrayWriter(repeatedGroupName: String, elementFieldName: String): ValueWriter =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val array = row.getArray(ordinal)
+        consumeGroup {
+          // Only creates the repeated field if the array is non-empty.
+          if (array.numElements() > 0) {
+            consumeField(repeatedGroupName, 0) {
+              var i = 0
+              while (i < array.numElements()) {
+                consumeGroup {
+                  // Only creates the element field if the current array element is not null.
+                  if (!array.isNullAt(i)) {
+                    consumeField(elementFieldName, 0) {
+                      elementWriter.apply(array, i)
+                    }
+                  }
+                }
+                i += 1
+              }
+            }
+          }
+        }
+      }
+
+    def twoLevelArrayWriter(repeatedFieldName: String): ValueWriter =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val array = row.getArray(ordinal)
+        consumeGroup {
+          // Only creates the repeated field if the array is non-empty.
+          if (array.numElements() > 0) {
+            consumeField(repeatedFieldName, 0) {
+              var i = 0
+              while (i < array.numElements()) {
+                elementWriter.apply(array, i)
+                i += 1
+              }
+            }
+          }
+        }
+      }
+
+    (writeLegacyParquetFormat, arrayType.containsNull) match {
+      case (legacyMode @ false, _) =>
+        // Standard mode:
+        //
+        //   <list-repetition> group <name> (LIST) {
+        //     repeated group list {
+        //                    ^~~~  repeatedGroupName
+        //       <element-repetition> <element-type> element;
+        //                                           ^~~~~~~  elementFieldName
+        //     }
+        //   }
+        threeLevelArrayWriter(repeatedGroupName = "list", elementFieldName = "element")
+
+      case (legacyMode @ true, nullableElements @ true) =>
+        // Legacy mode, with nullable elements:
+        //
+        //   <list-repetition> group <name> (LIST) {
+        //     optional group bag {
+        //                    ^~~  repeatedGroupName
+        //       repeated <element-type> array;
+        //                               ^~~~~ elementFieldName
+        //     }
+        //   }
+        threeLevelArrayWriter(repeatedGroupName = "bag", elementFieldName = "array")
+
+      case (legacyMode @ true, nullableElements @ false) =>
+        // Legacy mode, with non-nullable elements:
+        //
+        //   <list-repetition> group <name> (LIST) {
+        //     repeated <element-type> array;
+        //                             ^~~~~  repeatedFieldName
+        //   }
+        twoLevelArrayWriter(repeatedFieldName = "array")
+    }
+  }
+
+  private def makeMapWriter(mapType: MapType): ValueWriter = {
+    val keyWriter = makeWriter(mapType.keyType)
+    val valueWriter = makeWriter(mapType.valueType)
+    val repeatedGroupName = if (writeLegacyParquetFormat) {
+      // Legacy mode:
+      //
+      //   <map-repetition> group <name> (MAP) {
+      //     repeated group map (MAP_KEY_VALUE) {
+      //                    ^~~  repeatedGroupName
+      //       required <key-type> key;
+      //       <value-repetition> <value-type> value;
+      //     }
+      //   }
+      "map"
+    } else {
+      // Standard mode:
+      //
+      //   <map-repetition> group <name> (MAP) {
+      //     repeated group key_value {
+      //                    ^~~~~~~~~  repeatedGroupName
+      //       required <key-type> key;
+      //       <value-repetition> <value-type> value;
+      //     }
+      //   }
+      "key_value"
+    }
+
+    (row: SpecializedGetters, ordinal: Int) => {
+      val map = row.getMap(ordinal)
+      val keyArray = map.keyArray()
+      val valueArray = map.valueArray()
+
+      consumeGroup {
+        // Only creates the repeated field if the map is non-empty.
+        if (map.numElements() > 0) {
+          consumeField(repeatedGroupName, 0) {
+            var i = 0
+            while (i < map.numElements()) {
+              consumeGroup {
+                consumeField("key", 0) {
+                  keyWriter.apply(keyArray, i)
+                }
+
+                // Only creates the "value" field if the value if non-empty
+                if (!map.valueArray().isNullAt(i)) {
+                  consumeField("value", 1) {
+                    valueWriter.apply(valueArray, i)
+                  }
+                }
+              }
+              i += 1
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private def consumeMessage(f: => Unit): Unit = {
+    recordConsumer.startMessage()
+    f
+    recordConsumer.endMessage()
+  }
+
+  private def consumeGroup(f: => Unit): Unit = {
+    recordConsumer.startGroup()
+    f
+    recordConsumer.endGroup()
+  }
+
+  private def consumeField(field: String, index: Int)(f: => Unit): Unit = {
+    recordConsumer.startField(field, index)
+    f
+    recordConsumer.endField(field, index)
+  }
+}
+
+private[parquet] object CatalystWriteSupport {
+  val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
+
+  def setSchema(schema: StructType, configuration: Configuration): Unit = {
+    schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
+    configuration.set(SPARK_ROW_SCHEMA, schema.json)
+    configuration.set(
+      ParquetOutputFormat.WRITER_VERSION,
+      ParquetProperties.WriterVersion.PARQUET_1_0.toString)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
index de1fd01..300e867 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
@@ -39,7 +39,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetO
  *
  *   NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
  *   no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
- *   left * empty).
+ *   left empty).
  */
 private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
   extends ParquetOutputCommitter(outputPath, context) {

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
deleted file mode 100644
index ccd7ebf..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.{MapData, ArrayData}
-
-// TODO Removes this while fixing SPARK-8848
-private[sql] object CatalystConverter {
-  // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
-  // Note that "array" for the array elements is chosen by ParquetAvro.
-  // Using a different value will result in Parquet silently dropping columns.
-  val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
-  val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
-
-  val MAP_KEY_SCHEMA_NAME = "key"
-  val MAP_VALUE_SCHEMA_NAME = "value"
-  val MAP_SCHEMA_NAME = "map"
-
-  // TODO: consider using Array[T] for arrays to avoid boxing of primitive types
-  type ArrayScalaType = ArrayData
-  type StructScalaType = InternalRow
-  type MapScalaType = MapData
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index c6b3fe7..78040d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -18,24 +18,17 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.io.Serializable
-import java.nio.ByteBuffer
 
-import com.google.common.io.BaseEncoding
-import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.io.api.Binary
 import org.apache.parquet.schema.OriginalType
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 
-import org.apache.spark.SparkEnv
-import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
 
 private[sql] object ParquetFilters {
-  val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
-
   case class SetInFilter[T <: Comparable[T]](
     valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {
 
@@ -282,33 +275,4 @@ private[sql] object ParquetFilters {
     addMethod.setAccessible(true)
     addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
   }
-
-  /**
-   * Note: Inside the Hadoop API we only have access to `Configuration`, not to
-   * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
-   * the actual filter predicate.
-   */
-  def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = {
-    if (filters.nonEmpty) {
-      val serialized: Array[Byte] =
-        SparkEnv.get.closureSerializer.newInstance().serialize(filters).array()
-      val encoded: String = BaseEncoding.base64().encode(serialized)
-      conf.set(PARQUET_FILTER_DATA, encoded)
-    }
-  }
-
-  /**
-   * Note: Inside the Hadoop API we only have access to `Configuration`, not to
-   * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
-   * the actual filter predicate.
-   */
-  def deserializeFilterExpressions(conf: Configuration): Seq[Expression] = {
-    val data = conf.get(PARQUET_FILTER_DATA)
-    if (data != null) {
-      val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
-      SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded))
-    } else {
-      Seq()
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 8a9c0e7..77d851c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -218,8 +218,8 @@ private[sql] class ParquetRelation(
     }
 
     // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
-    val committerClassname = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
-    if (committerClassname == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
+    val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
+    if (committerClassName == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
       conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
         classOf[DirectParquetOutputCommitter].getCanonicalName)
     }
@@ -248,18 +248,22 @@ private[sql] class ParquetRelation(
     // bundled with `ParquetOutputFormat[Row]`.
     job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
 
-    // TODO There's no need to use two kinds of WriteSupport
-    // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
-    // complex types.
-    val writeSupportClass =
-      if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
-        classOf[MutableRowWriteSupport]
-      } else {
-        classOf[RowWriteSupport]
-      }
+    ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
+    CatalystWriteSupport.setSchema(dataSchema, conf)
+
+    // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
+    // and `CatalystWriteSupport` (writing actual rows to Parquet files).
+    conf.set(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sqlContext.conf.isParquetBinaryAsString.toString)
 
-    ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
-    RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
+    conf.set(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sqlContext.conf.isParquetINT96AsTimestamp.toString)
+
+    conf.set(
+      SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
+      sqlContext.conf.writeLegacyParquetFormat.toString)
 
     // Sets compression scheme
     conf.set(
@@ -287,7 +291,6 @@ private[sql] class ParquetRelation(
     val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
     val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
     val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
-    val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
 
     // Parquet row group size. We will use this value as the value for
     // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
@@ -304,8 +307,7 @@ private[sql] class ParquetRelation(
         useMetadataCache,
         parquetFilterPushDown,
         assumeBinaryIsString,
-        assumeInt96IsTimestamp,
-        writeLegacyParquetFormat) _
+        assumeInt96IsTimestamp) _
 
     // Create the function to set input paths at the driver side.
     val setInputPaths =
@@ -530,8 +532,7 @@ private[sql] object ParquetRelation extends Logging {
       useMetadataCache: Boolean,
       parquetFilterPushDown: Boolean,
       assumeBinaryIsString: Boolean,
-      assumeInt96IsTimestamp: Boolean,
-      writeLegacyParquetFormat: Boolean)(job: Job): Unit = {
+      assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
     val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
     conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
 
@@ -552,16 +553,15 @@ private[sql] object ParquetRelation extends Logging {
     })
 
     conf.set(
-      RowWriteSupport.SPARK_ROW_SCHEMA,
+      CatalystWriteSupport.SPARK_ROW_SCHEMA,
       CatalystSchemaConverter.checkFieldNames(dataSchema).json)
 
     // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
     conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
 
-    // Sets flags for Parquet schema conversion
+    // Sets flags for `CatalystSchemaConverter`
     conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
     conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
-    conf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, writeLegacyParquetFormat)
 
     overrideMinSplitSize(parquetBlockSize, conf)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
deleted file mode 100644
index ed89aa2..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import java.math.BigInteger
-import java.nio.{ByteBuffer, ByteOrder}
-import java.util.{HashMap => JHashMap}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.column.ParquetProperties
-import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api._
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-/**
- * A `parquet.hadoop.api.WriteSupport` for Row objects.
- */
-private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Logging {
-
-  private[parquet] var writer: RecordConsumer = null
-  private[parquet] var attributes: Array[Attribute] = null
-
-  override def init(configuration: Configuration): WriteSupport.WriteContext = {
-    val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
-    val metadata = new JHashMap[String, String]()
-    metadata.put(CatalystReadSupport.SPARK_METADATA_KEY, origAttributesStr)
-
-    if (attributes == null) {
-      attributes = ParquetTypesConverter.convertFromString(origAttributesStr).toArray
-    }
-
-    log.debug(s"write support initialized for requested schema $attributes")
-    new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata)
-  }
-
-  override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
-    writer = recordConsumer
-    log.debug(s"preparing for write with schema $attributes")
-  }
-
-  override def write(record: InternalRow): Unit = {
-    val attributesSize = attributes.size
-    if (attributesSize > record.numFields) {
-      throw new IndexOutOfBoundsException("Trying to write more fields than contained in row " +
-        s"($attributesSize > ${record.numFields})")
-    }
-
-    var index = 0
-    writer.startMessage()
-    while(index < attributesSize) {
-      // null values indicate optional fields but we do not check currently
-      if (!record.isNullAt(index)) {
-        writer.startField(attributes(index).name, index)
-        writeValue(attributes(index).dataType, record.get(index, attributes(index).dataType))
-        writer.endField(attributes(index).name, index)
-      }
-      index = index + 1
-    }
-    writer.endMessage()
-  }
-
-  private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
-    if (value != null) {
-      schema match {
-        case t: UserDefinedType[_] => writeValue(t.sqlType, value)
-        case t @ ArrayType(_, _) => writeArray(
-          t,
-          value.asInstanceOf[CatalystConverter.ArrayScalaType])
-        case t @ MapType(_, _, _) => writeMap(
-          t,
-          value.asInstanceOf[CatalystConverter.MapScalaType])
-        case t @ StructType(_) => writeStruct(
-          t,
-          value.asInstanceOf[CatalystConverter.StructScalaType])
-        case _ => writePrimitive(schema.asInstanceOf[AtomicType], value)
-      }
-    }
-  }
-
-  private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = {
-    if (value != null) {
-      schema match {
-        case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
-        case ByteType => writer.addInteger(value.asInstanceOf[Byte])
-        case ShortType => writer.addInteger(value.asInstanceOf[Short])
-        case IntegerType | DateType => writer.addInteger(value.asInstanceOf[Int])
-        case LongType => writer.addLong(value.asInstanceOf[Long])
-        case TimestampType => writeTimestamp(value.asInstanceOf[Long])
-        case FloatType => writer.addFloat(value.asInstanceOf[Float])
-        case DoubleType => writer.addDouble(value.asInstanceOf[Double])
-        case StringType => writer.addBinary(
-          Binary.fromByteArray(value.asInstanceOf[UTF8String].getBytes))
-        case BinaryType => writer.addBinary(
-          Binary.fromByteArray(value.asInstanceOf[Array[Byte]]))
-        case DecimalType.Fixed(precision, _) =>
-          writeDecimal(value.asInstanceOf[Decimal], precision)
-        case _ => sys.error(s"Do not know how to writer $schema to consumer")
-      }
-    }
-  }
-
-  private[parquet] def writeStruct(
-      schema: StructType,
-      struct: CatalystConverter.StructScalaType): Unit = {
-    if (struct != null) {
-      val fields = schema.fields.toArray
-      writer.startGroup()
-      var i = 0
-      while(i < fields.length) {
-        if (!struct.isNullAt(i)) {
-          writer.startField(fields(i).name, i)
-          writeValue(fields(i).dataType, struct.get(i, fields(i).dataType))
-          writer.endField(fields(i).name, i)
-        }
-        i = i + 1
-      }
-      writer.endGroup()
-    }
-  }
-
-  private[parquet] def writeArray(
-      schema: ArrayType,
-      array: CatalystConverter.ArrayScalaType): Unit = {
-    val elementType = schema.elementType
-    writer.startGroup()
-    if (array.numElements() > 0) {
-      if (schema.containsNull) {
-        writer.startField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0)
-        var i = 0
-        while (i < array.numElements()) {
-          writer.startGroup()
-          if (!array.isNullAt(i)) {
-            writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-            writeValue(elementType, array.get(i, elementType))
-            writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-          }
-          writer.endGroup()
-          i = i + 1
-        }
-        writer.endField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0)
-      } else {
-        writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-        var i = 0
-        while (i < array.numElements()) {
-          writeValue(elementType, array.get(i, elementType))
-          i = i + 1
-        }
-        writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-      }
-    }
-    writer.endGroup()
-  }
-
-  private[parquet] def writeMap(
-      schema: MapType,
-      map: CatalystConverter.MapScalaType): Unit = {
-    writer.startGroup()
-    val length = map.numElements()
-    if (length > 0) {
-      writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0)
-      map.foreach(schema.keyType, schema.valueType, (key, value) => {
-        writer.startGroup()
-        writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
-        writeValue(schema.keyType, key)
-        writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
-        if (value != null) {
-          writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
-          writeValue(schema.valueType, value)
-          writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
-        }
-        writer.endGroup()
-      })
-      writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0)
-    }
-    writer.endGroup()
-  }
-
-  // Scratch array used to write decimals as fixed-length byte array
-  private[this] var reusableDecimalBytes = new Array[Byte](16)
-
-  private[parquet] def writeDecimal(decimal: Decimal, precision: Int): Unit = {
-    val numBytes = CatalystSchemaConverter.minBytesForPrecision(precision)
-
-    def longToBinary(unscaled: Long): Binary = {
-      var i = 0
-      var shift = 8 * (numBytes - 1)
-      while (i < numBytes) {
-        reusableDecimalBytes(i) = (unscaled >> shift).toByte
-        i += 1
-        shift -= 8
-      }
-      Binary.fromByteArray(reusableDecimalBytes, 0, numBytes)
-    }
-
-    def bigIntegerToBinary(unscaled: BigInteger): Binary = {
-      unscaled.toByteArray match {
-        case bytes if bytes.length == numBytes =>
-          Binary.fromByteArray(bytes)
-
-        case bytes if bytes.length <= reusableDecimalBytes.length =>
-          val signedByte = (if (bytes.head < 0) -1 else 0).toByte
-          java.util.Arrays.fill(reusableDecimalBytes, 0, numBytes - bytes.length, signedByte)
-          System.arraycopy(bytes, 0, reusableDecimalBytes, numBytes - bytes.length, bytes.length)
-          Binary.fromByteArray(reusableDecimalBytes, 0, numBytes)
-
-        case bytes =>
-          reusableDecimalBytes = new Array[Byte](bytes.length)
-          bigIntegerToBinary(unscaled)
-      }
-    }
-
-    val binary = if (numBytes <= 8) {
-      longToBinary(decimal.toUnscaledLong)
-    } else {
-      bigIntegerToBinary(decimal.toJavaBigDecimal.unscaledValue())
-    }
-
-    writer.addBinary(binary)
-  }
-
-  // array used to write Timestamp as Int96 (fixed-length binary)
-  private[this] val int96buf = new Array[Byte](12)
-
-  private[parquet] def writeTimestamp(ts: Long): Unit = {
-    val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts)
-    val buf = ByteBuffer.wrap(int96buf)
-    buf.order(ByteOrder.LITTLE_ENDIAN)
-    buf.putLong(timeOfDayNanos)
-    buf.putInt(julianDay)
-    writer.addBinary(Binary.fromByteArray(int96buf))
-  }
-}
-
-// Optimized for non-nested rows
-private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
-  override def write(record: InternalRow): Unit = {
-    val attributesSize = attributes.size
-    if (attributesSize > record.numFields) {
-      throw new IndexOutOfBoundsException("Trying to write more fields than contained in row " +
-        s"($attributesSize > ${record.numFields})")
-    }
-
-    var index = 0
-    writer.startMessage()
-    while(index < attributesSize) {
-      // null values indicate optional fields but we do not check currently
-      if (!record.isNullAt(index) && !record.isNullAt(index)) {
-        writer.startField(attributes(index).name, index)
-        consumeType(attributes(index).dataType, record, index)
-        writer.endField(attributes(index).name, index)
-      }
-      index = index + 1
-    }
-    writer.endMessage()
-  }
-
-  private def consumeType(
-      ctype: DataType,
-      record: InternalRow,
-      index: Int): Unit = {
-    ctype match {
-      case BooleanType => writer.addBoolean(record.getBoolean(index))
-      case ByteType => writer.addInteger(record.getByte(index))
-      case ShortType => writer.addInteger(record.getShort(index))
-      case IntegerType | DateType => writer.addInteger(record.getInt(index))
-      case LongType => writer.addLong(record.getLong(index))
-      case TimestampType => writeTimestamp(record.getLong(index))
-      case FloatType => writer.addFloat(record.getFloat(index))
-      case DoubleType => writer.addDouble(record.getDouble(index))
-      case StringType =>
-        writer.addBinary(Binary.fromByteArray(record.getUTF8String(index).getBytes))
-      case BinaryType =>
-        writer.addBinary(Binary.fromByteArray(record.getBinary(index)))
-      case DecimalType.Fixed(precision, scale) =>
-        writeDecimal(record.getDecimal(index, precision, scale), precision)
-      case _ => sys.error(s"Unsupported datatype $ctype, cannot write to consumer")
-    }
-  }
-}
-
-private[parquet] object RowWriteSupport {
-  val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
-
-  def getSchema(configuration: Configuration): Seq[Attribute] = {
-    val schemaString = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
-    if (schemaString == null) {
-      throw new RuntimeException("Missing schema!")
-    }
-    ParquetTypesConverter.convertFromString(schemaString)
-  }
-
-  def setSchema(schema: Seq[Attribute], configuration: Configuration) {
-    val encoded = ParquetTypesConverter.convertToString(schema)
-    configuration.set(SPARK_ROW_SCHEMA, encoded)
-    configuration.set(
-      ParquetOutputFormat.WRITER_VERSION,
-      ParquetProperties.WriterVersion.PARQUET_1_0.toString)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
deleted file mode 100644
index b647bb6..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import java.io.IOException
-import java.util.{Collections, Arrays}
-
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapreduce.Job
-import org.apache.parquet.format.converter.ParquetMetadataConverter
-import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.types._
-
-
-private[parquet] object ParquetTypesConverter extends Logging {
-  def isPrimitiveType(ctype: DataType): Boolean = ctype match {
-    case _: NumericType | BooleanType | DateType | TimestampType | StringType | BinaryType => true
-    case _ => false
-  }
-
-  /**
-   * Compute the FIXED_LEN_BYTE_ARRAY length needed to represent a given DECIMAL precision.
-   */
-  private[parquet] val BYTES_FOR_PRECISION = Array.tabulate[Int](38) { precision =>
-    var length = 1
-    while (math.pow(2.0, 8 * length - 1) < math.pow(10.0, precision)) {
-      length += 1
-    }
-    length
-  }
-
-  def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
-    val converter = new CatalystSchemaConverter()
-    converter.convert(StructType.fromAttributes(attributes))
-  }
-
-  def convertFromString(string: String): Seq[Attribute] = {
-    Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) match {
-      case s: StructType => s.toAttributes
-      case other => sys.error(s"Can convert $string to row")
-    }
-  }
-
-  def convertToString(schema: Seq[Attribute]): String = {
-    schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
-    StructType.fromAttributes(schema).json
-  }
-
-  def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration): Unit = {
-    if (origPath == null) {
-      throw new IllegalArgumentException("Unable to write Parquet metadata: path is null")
-    }
-    val fs = origPath.getFileSystem(conf)
-    if (fs == null) {
-      throw new IllegalArgumentException(
-        s"Unable to write Parquet metadata: path $origPath is incorrectly formatted")
-    }
-    val path = origPath.makeQualified(fs)
-    if (fs.exists(path) && !fs.getFileStatus(path).isDir) {
-      throw new IllegalArgumentException(s"Expected to write to directory $path but found file")
-    }
-    val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
-    if (fs.exists(metadataPath)) {
-      try {
-        fs.delete(metadataPath, true)
-      } catch {
-        case e: IOException =>
-          throw new IOException(s"Unable to delete previous PARQUET_METADATA_FILE at $metadataPath")
-      }
-    }
-    val extraMetadata = new java.util.HashMap[String, String]()
-    extraMetadata.put(
-      CatalystReadSupport.SPARK_METADATA_KEY,
-      ParquetTypesConverter.convertToString(attributes))
-    // TODO: add extra data, e.g., table name, date, etc.?
-
-    val parquetSchema: MessageType = ParquetTypesConverter.convertFromAttributes(attributes)
-    val metaData: FileMetaData = new FileMetaData(
-      parquetSchema,
-      extraMetadata,
-      "Spark")
-
-    ParquetFileWriter.writeMetadataFile(
-      conf,
-      path,
-      Arrays.asList(new Footer(path, new ParquetMetadata(metaData, Collections.emptyList()))))
-  }
-
-  /**
-   * Try to read Parquet metadata at the given Path. We first see if there is a summary file
-   * in the parent directory. If so, this is used. Else we read the actual footer at the given
-   * location.
-   * @param origPath The path at which we expect one (or more) Parquet files.
-   * @param configuration The Hadoop configuration to use.
-   * @return The `ParquetMetadata` containing among other things the schema.
-   */
-  def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = {
-    if (origPath == null) {
-      throw new IllegalArgumentException("Unable to read Parquet metadata: path is null")
-    }
-    val job = new Job()
-    val conf = {
-      // scalastyle:off jobcontext
-      configuration.getOrElse(ContextUtil.getConfiguration(job))
-      // scalastyle:on jobcontext
-    }
-    val fs: FileSystem = origPath.getFileSystem(conf)
-    if (fs == null) {
-      throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
-    }
-    val path = origPath.makeQualified(fs)
-
-    val children =
-      fs
-        .globStatus(path)
-        .flatMap { status => if (status.isDir) fs.listStatus(status.getPath) else List(status) }
-        .filterNot { status =>
-          val name = status.getPath.getName
-          (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE
-        }
-
-    // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
-    // groups. Since Parquet schema is replicated among all row groups, we only need to touch a
-    // single row group to read schema related metadata. Notice that we are making assumptions that
-    // all data in a single Parquet file have the same schema, which is normally true.
-    children
-      // Try any non-"_metadata" file first...
-      .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE)
-      // ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is
-      // empty, thus normally the "_metadata" file is expected to be fairly small).
-      .orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE))
-      .map(ParquetFileReader.readFooter(conf, _, ParquetMetadataConverter.NO_FILTER))
-      .getOrElse(
-        throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 7992fd5..d17671d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -24,6 +24,7 @@ import com.clearspring.analytics.stream.cardinality.HyperLogLog
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.expressions.{OpenHashSetUDT, HyperLogLogUDT}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -68,7 +69,7 @@ private[sql] class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
   private[spark] override def asNullable: MyDenseVectorUDT = this
 }
 
-class UserDefinedTypeSuite extends QueryTest with SharedSQLContext {
+class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetTest {
   import testImplicits._
 
   private lazy val pointsRDD = Seq(
@@ -98,17 +99,28 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext {
       Seq(Row(true), Row(true)))
   }
 
-
-  test("UDTs with Parquet") {
-    val tempDir = Utils.createTempDir()
-    tempDir.delete()
-    pointsRDD.write.parquet(tempDir.getCanonicalPath)
+  testStandardAndLegacyModes("UDTs with Parquet") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      pointsRDD.write.parquet(path)
+      checkAnswer(
+        sqlContext.read.parquet(path),
+        Seq(
+          Row(1.0, new MyDenseVector(Array(0.1, 1.0))),
+          Row(0.0, new MyDenseVector(Array(0.2, 2.0)))))
+    }
   }
 
-  test("Repartition UDTs with Parquet") {
-    val tempDir = Utils.createTempDir()
-    tempDir.delete()
-    pointsRDD.repartition(1).write.parquet(tempDir.getCanonicalPath)
+  testStandardAndLegacyModes("Repartition UDTs with Parquet") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      pointsRDD.repartition(1).write.parquet(path)
+      checkAnswer(
+        sqlContext.read.parquet(path),
+        Seq(
+          Row(1.0, new MyDenseVector(Array(0.1, 1.0))),
+          Row(0.0, new MyDenseVector(Array(0.2, 2.0)))))
+    }
   }
 
   // Tests to make sure that all operators correctly convert types on the way out.

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index cd552e8..599cf94 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -28,10 +28,10 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 import org.apache.parquet.example.data.simple.SimpleGroup
 import org.apache.parquet.example.data.{Group, GroupWriter}
+import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.api.WriteSupport
 import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
-import org.apache.parquet.hadoop.metadata.{BlockMetaData, CompressionCodecName, FileMetaData, ParquetMetadata}
-import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter}
+import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
 import org.apache.parquet.io.api.RecordConsumer
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
@@ -99,16 +99,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true")(checkParquetFile(data))
   }
 
-  test("fixed-length decimals") {
-    def makeDecimalRDD(decimal: DecimalType): DataFrame =
-      sparkContext
-        .parallelize(0 to 1000)
-        .map(i => Tuple1(i / 100.0))
-        .toDF()
-        // Parquet doesn't allow column names with spaces, have to add an alias here
-        .select($"_1" cast decimal as "dec")
+  testStandardAndLegacyModes("fixed-length decimals") {
+    def makeDecimalRDD(decimal: DecimalType): DataFrame = {
+      sqlContext
+        .range(1000)
+        // Parquet doesn't allow column names with spaces, have to add an alias here.
+        // Minus 500 here so that negative decimals are also tested.
+        .select((('id - 500) / 100.0) cast decimal as 'dec)
+        .coalesce(1)
+    }
 
-    for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37))) {
+    val combinations = Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37))
+    for ((precision, scale) <- combinations) {
       withTempPath { dir =>
         val data = makeDecimalRDD(DecimalType(precision, scale))
         data.write.parquet(dir.getCanonicalPath)
@@ -132,22 +134,22 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     }
   }
 
-  test("map") {
+  testStandardAndLegacyModes("map") {
     val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
     checkParquetFile(data)
   }
 
-  test("array") {
+  testStandardAndLegacyModes("array") {
     val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1)))
     checkParquetFile(data)
   }
 
-  test("array and double") {
+  testStandardAndLegacyModes("array and double") {
     val data = (1 to 4).map(i => (i.toDouble, Seq(i.toDouble, (i + 1).toDouble)))
     checkParquetFile(data)
   }
 
-  test("struct") {
+  testStandardAndLegacyModes("struct") {
     val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
     withParquetDataFrame(data) { df =>
       // Structs are converted to `Row`s
@@ -157,7 +159,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     }
   }
 
-  test("nested struct with array of array as field") {
+  testStandardAndLegacyModes("nested struct with array of array as field") {
     val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
     withParquetDataFrame(data) { df =>
       // Structs are converted to `Row`s
@@ -167,7 +169,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     }
   }
 
-  test("nested map with struct as value type") {
+  testStandardAndLegacyModes("nested map with struct as value type") {
     val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i"))))
     withParquetDataFrame(data) { df =>
       checkAnswer(df, data.map { case Tuple1(m) =>
@@ -205,14 +207,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
   }
 
   test("compression codec") {
-    def compressionCodecFor(path: String): String = {
-      val codecs = ParquetTypesConverter
-        .readMetaData(new Path(path), Some(hadoopConfiguration)).getBlocks.asScala
-        .flatMap(_.getColumns.asScala)
-        .map(_.getCodec.name())
-        .distinct
-
-      assert(codecs.size === 1)
+    def compressionCodecFor(path: String, codecName: String): String = {
+      val codecs = for {
+        footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConfiguration)
+        block <- footer.getParquetMetadata.getBlocks.asScala
+        column <- block.getColumns.asScala
+      } yield column.getCodec.name()
+
+      assert(codecs.distinct === Seq(codecName))
       codecs.head
     }
 
@@ -222,7 +224,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
         withParquetFile(data) { path =>
           assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) {
-            compressionCodecFor(path)
+            compressionCodecFor(path, codec.name())
           }
         }
       }
@@ -278,15 +280,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     withTempPath { file =>
       val path = new Path(file.toURI.toString)
       val fs = FileSystem.getLocal(hadoopConfiguration)
-      val attributes = ScalaReflection.attributesFor[(Int, String)]
-      ParquetTypesConverter.writeMetaData(attributes, path, hadoopConfiguration)
+      val schema = StructType.fromAttributes(ScalaReflection.attributesFor[(Int, String)])
+      writeMetadata(schema, path, hadoopConfiguration)
 
       assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
       assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
 
-      val metaData = ParquetTypesConverter.readMetaData(path, Some(hadoopConfiguration))
-      val actualSchema = metaData.getFileMetaData.getSchema
-      val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes)
+      val expectedSchema = new CatalystSchemaConverter().convert(schema)
+      val actualSchema = readFooter(path, hadoopConfiguration).getFileMetaData.getSchema
 
       actualSchema.checkContains(expectedSchema)
       expectedSchema.checkContains(actualSchema)

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 1c1cfa3..cc02ef8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -484,7 +484,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     }
   }
 
-  test("SPARK-10301 requested schema clipping - UDT") {
+  testStandardAndLegacyModes("SPARK-10301 requested schema clipping - UDT") {
     withTempPath { dir =>
       val path = dir.getCanonicalPath
 
@@ -517,6 +517,50 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
         Row(Row(NestedStruct(1, 2L, 3.5D))))
     }
   }
+
+  test("expand UDT in StructType") {
+    val schema = new StructType().add("n", new NestedStructUDT, nullable = true)
+    val expected = new StructType().add("n", new NestedStructUDT().sqlType, nullable = true)
+    assert(CatalystReadSupport.expandUDT(schema) === expected)
+  }
+
+  test("expand UDT in ArrayType") {
+    val schema = new StructType().add(
+      "n",
+      ArrayType(
+        elementType = new NestedStructUDT,
+        containsNull = false),
+      nullable = true)
+
+    val expected = new StructType().add(
+      "n",
+      ArrayType(
+        elementType = new NestedStructUDT().sqlType,
+        containsNull = false),
+      nullable = true)
+
+    assert(CatalystReadSupport.expandUDT(schema) === expected)
+  }
+
+  test("expand UDT in MapType") {
+    val schema = new StructType().add(
+      "n",
+      MapType(
+        keyType = IntegerType,
+        valueType = new NestedStructUDT,
+        valueContainsNull = false),
+      nullable = true)
+
+    val expected = new StructType().add(
+      "n",
+      MapType(
+        keyType = IntegerType,
+        valueType = new NestedStructUDT().sqlType,
+        valueContainsNull = false),
+      nullable = true)
+
+    assert(CatalystReadSupport.expandUDT(schema) === expected)
+  }
 }
 
 object TestingUDT {

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index f17fb36..60fa81b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -357,8 +357,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     val jsonString = """{"type":"struct","fields":[{"name":"c1","type":"integer","nullable":false,"metadata":{}},{"name":"c2","type":"binary","nullable":true,"metadata":{}}]}"""
     // scalastyle:on
 
-    val fromCaseClassString = ParquetTypesConverter.convertFromString(caseClassString)
-    val fromJson = ParquetTypesConverter.convertFromString(jsonString)
+    val fromCaseClassString = StructType.fromString(caseClassString)
+    val fromJson = StructType.fromString(jsonString)
 
     (fromCaseClassString, fromJson).zipped.foreach { (a, b) =>
       assert(a.name == b.name)

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 442fafb..9840ad9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -19,11 +19,19 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.io.File
 
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, ParquetMetadata}
+import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
+
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLConf, SaveMode}
 
 /**
  * A helper trait that provides convenient facilities for Parquet testing.
@@ -97,4 +105,38 @@ private[sql] trait ParquetTest extends SQLTestUtils {
     assert(partDir.mkdirs(), s"Couldn't create directory $partDir")
     partDir
   }
+
+  protected def writeMetadata(
+      schema: StructType, path: Path, configuration: Configuration): Unit = {
+    val parquetSchema = new CatalystSchemaConverter().convert(schema)
+    val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schema.json).asJava
+    val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}"
+    val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, createdBy)
+    val parquetMetadata = new ParquetMetadata(fileMetadata, Seq.empty[BlockMetaData].asJava)
+    val footer = new Footer(path, parquetMetadata)
+    ParquetFileWriter.writeMetadataFile(configuration, path, Seq(footer).asJava)
+  }
+
+  protected def readAllFootersWithoutSummaryFiles(
+      path: Path, configuration: Configuration): Seq[Footer] = {
+    val fs = path.getFileSystem(configuration)
+    ParquetFileReader.readAllFootersInParallel(configuration, fs.getFileStatus(path)).asScala.toSeq
+  }
+
+  protected def readFooter(path: Path, configuration: Configuration): ParquetMetadata = {
+    ParquetFileReader.readFooter(
+      configuration,
+      new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE),
+      ParquetMetadataConverter.NO_FILTER)
+  }
+
+  protected def testStandardAndLegacyModes(testName: String)(f: => Unit): Unit = {
+    test(s"Standard mode - $testName") {
+      withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") { f }
+    }
+
+    test(s"Legacy mode - $testName") {
+      withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") { f }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/02149ff0/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 107457f..d63f3d3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -20,11 +20,11 @@ package org.apache.spark.sql.hive
 import java.io.File
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{QueryTest, Row, SaveMode}
 import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
 import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
+import org.apache.spark.sql.{SQLConf, QueryTest, Row, SaveMode}
 
 class HiveMetastoreCatalogSuite extends SparkFunSuite with TestHiveSingleton {
   import hiveContext.implicits._
@@ -74,11 +74,13 @@ class DataSourceWithHiveMetastoreCatalogSuite
   ).foreach { case (provider, (inputFormat, outputFormat, serde)) =>
     test(s"Persist non-partitioned $provider relation into metastore as managed table") {
       withTable("t") {
-        testDF
-          .write
-          .mode(SaveMode.Overwrite)
-          .format(provider)
-          .saveAsTable("t")
+        withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") {
+          testDF
+            .write
+            .mode(SaveMode.Overwrite)
+            .format(provider)
+            .saveAsTable("t")
+        }
 
         val hiveTable = catalog.client.getTable("default", "t")
         assert(hiveTable.inputFormat === Some(inputFormat))
@@ -102,12 +104,14 @@ class DataSourceWithHiveMetastoreCatalogSuite
         withTable("t") {
           val path = dir.getCanonicalFile
 
-          testDF
-            .write
-            .mode(SaveMode.Overwrite)
-            .format(provider)
-            .option("path", path.toString)
-            .saveAsTable("t")
+          withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") {
+            testDF
+              .write
+              .mode(SaveMode.Overwrite)
+              .format(provider)
+              .option("path", path.toString)
+              .saveAsTable("t")
+          }
 
           val hiveTable = catalog.client.getTable("default", "t")
           assert(hiveTable.inputFormat === Some(inputFormat))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org