You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/11 21:40:45 UTC

spark git commit: [SPARK-10365][SQL] Support Parquet logical type TIMESTAMP_MICROS

Repository: spark
Updated Branches:
  refs/heads/master d6ee69e77 -> 21a7bfd5c


[SPARK-10365][SQL] Support Parquet logical type TIMESTAMP_MICROS

## What changes were proposed in this pull request?

This PR makes Spark to be able to read Parquet TIMESTAMP_MICROS values, and add a new config to allow Spark to write timestamp values to parquet as TIMESTAMP_MICROS type.

## How was this patch tested?

new test

Author: Wenchen Fan <we...@databricks.com>

Closes #19702 from cloud-fan/parquet.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21a7bfd5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21a7bfd5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21a7bfd5

Branch: refs/heads/master
Commit: 21a7bfd5c324e6c82152229f1394f26afeae771c
Parents: d6ee69e
Author: Wenchen Fan <we...@databricks.com>
Authored: Sat Nov 11 22:40:26 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Nov 11 22:40:26 2017 +0100

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  30 ++++-
 .../SpecificParquetRecordReaderBase.java        |   4 +-
 .../parquet/VectorizedColumnReader.java         |  26 +++--
 .../parquet/VectorizedParquetRecordReader.java  |  11 +-
 .../datasources/parquet/ParquetFileFormat.scala |  51 +++------
 .../parquet/ParquetReadSupport.scala            |   4 +-
 .../parquet/ParquetRecordMaterializer.scala     |   4 +-
 .../parquet/ParquetRowConverter.scala           |  15 ++-
 .../parquet/ParquetSchemaConverter.scala        |  92 +++++++--------
 .../parquet/ParquetWriteSupport.scala           |  48 ++++----
 .../datasources/parquet/ParquetIOSuite.scala    |   5 +-
 .../datasources/parquet/ParquetQuerySuite.scala |  22 ++++
 .../parquet/ParquetSchemaSuite.scala            | 111 +++++++------------
 .../datasources/parquet/ParquetTest.scala       |   2 +-
 .../spark/sql/internal/SQLConfSuite.scala       |  28 +++++
 15 files changed, 249 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a04f877..831ef62 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -285,8 +285,24 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  object ParquetOutputTimestampType extends Enumeration {
+    val INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS = Value
+  }
+
+  val PARQUET_OUTPUT_TIMESTAMP_TYPE = buildConf("spark.sql.parquet.outputTimestampType")
+    .doc("Sets which Parquet timestamp type to use when Spark writes data to Parquet files. " +
+      "INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS " +
+      "is a standard timestamp type in Parquet, which stores number of microseconds from the " +
+      "Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which " +
+      "means Spark has to truncate the microsecond portion of its timestamp value.")
+    .stringConf
+    .transform(_.toUpperCase(Locale.ROOT))
+    .checkValues(ParquetOutputTimestampType.values.map(_.toString))
+    .createWithDefault(ParquetOutputTimestampType.INT96.toString)
+
   val PARQUET_INT64_AS_TIMESTAMP_MILLIS = buildConf("spark.sql.parquet.int64AsTimestampMillis")
-    .doc("When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the " +
+    .doc(s"(Deprecated since Spark 2.3, please set ${PARQUET_OUTPUT_TIMESTAMP_TYPE.key}.) " +
+      "When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the " +
       "extended type. In this mode, the microsecond portion of the timestamp value will be" +
       "truncated.")
     .booleanConf
@@ -1143,6 +1159,18 @@ class SQLConf extends Serializable with Logging {
 
   def isParquetINT64AsTimestampMillis: Boolean = getConf(PARQUET_INT64_AS_TIMESTAMP_MILLIS)
 
+  def parquetOutputTimestampType: ParquetOutputTimestampType.Value = {
+    val isOutputTimestampTypeSet = settings.containsKey(PARQUET_OUTPUT_TIMESTAMP_TYPE.key)
+    if (!isOutputTimestampTypeSet && isParquetINT64AsTimestampMillis) {
+      // If PARQUET_OUTPUT_TIMESTAMP_TYPE is not set and PARQUET_INT64_AS_TIMESTAMP_MILLIS is set,
+      // respect PARQUET_INT64_AS_TIMESTAMP_MILLIS and use TIMESTAMP_MILLIS. Otherwise,
+      // PARQUET_OUTPUT_TIMESTAMP_TYPE has higher priority.
+      ParquetOutputTimestampType.TIMESTAMP_MILLIS
+    } else {
+      ParquetOutputTimestampType.withName(getConf(PARQUET_OUTPUT_TIMESTAMP_TYPE))
+    }
+  }
+
   def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
 
   def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 5a810ca..80c2f49 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -197,8 +197,6 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     Configuration config = new Configuration();
     config.set("spark.sql.parquet.binaryAsString", "false");
     config.set("spark.sql.parquet.int96AsTimestamp", "false");
-    config.set("spark.sql.parquet.writeLegacyFormat", "false");
-    config.set("spark.sql.parquet.int64AsTimestampMillis", "false");
 
     this.file = new Path(path);
     long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
@@ -224,7 +222,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
         this.requestedSchema = ParquetSchemaConverter.EMPTY_MESSAGE();
       }
     }
-    this.sparkSchema = new ParquetSchemaConverter(config).convert(requestedSchema);
+    this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema);
     this.reader = new ParquetFileReader(
         config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
     for (BlockMetaData block : blocks) {

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 3c8d766..0f1f470 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -26,6 +26,7 @@ import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.*;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -91,11 +92,15 @@ public class VectorizedColumnReader {
 
   private final PageReader pageReader;
   private final ColumnDescriptor descriptor;
+  private final OriginalType originalType;
 
-  public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
-      throws IOException {
+  public VectorizedColumnReader(
+      ColumnDescriptor descriptor,
+      OriginalType originalType,
+      PageReader pageReader) throws IOException {
     this.descriptor = descriptor;
     this.pageReader = pageReader;
+    this.originalType = originalType;
     this.maxDefLevel = descriptor.getMaxDefinitionLevel();
 
     DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
@@ -158,12 +163,12 @@ public class VectorizedColumnReader {
         defColumn.readIntegers(
             num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
 
-        // Timestamp values encoded as INT64 can't be lazily decoded as we need to post process
+        // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post process
         // the values to add microseconds precision.
         if (column.hasDictionary() || (rowId == 0 &&
             (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
             (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64  &&
-               column.dataType() != DataTypes.TimestampType) ||
+              originalType != OriginalType.TIMESTAMP_MILLIS) ||
             descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT ||
             descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE ||
             descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) {
@@ -253,21 +258,21 @@ public class VectorizedColumnReader {
 
       case INT64:
         if (column.dataType() == DataTypes.LongType ||
-            DecimalType.is64BitDecimalType(column.dataType())) {
+            DecimalType.is64BitDecimalType(column.dataType()) ||
+            originalType == OriginalType.TIMESTAMP_MICROS) {
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
             }
           }
-        } else if (column.dataType() == DataTypes.TimestampType) {
+        } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               column.putLong(i,
                 DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i))));
             }
           }
-        }
-        else {
+        } else {
           throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
         }
         break;
@@ -377,10 +382,11 @@ public class VectorizedColumnReader {
   private void readLongBatch(int rowId, int num, WritableColumnVector column) {
     // This is where we implement support for the valid type conversions.
     if (column.dataType() == DataTypes.LongType ||
-        DecimalType.is64BitDecimalType(column.dataType())) {
+        DecimalType.is64BitDecimalType(column.dataType()) ||
+        originalType == OriginalType.TIMESTAMP_MICROS) {
       defColumn.readLongs(
         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
-    } else if (column.dataType() == DataTypes.TimestampType) {
+    } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
       for (int i = 0; i < num; i++) {
         if (defColumn.readInteger() == maxDefLevel) {
           column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 0cacf0c..e827229 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -165,8 +165,10 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
   // Columns 0,1: data columns
   // Column 2: partitionValues[0]
   // Column 3: partitionValues[1]
-  public void initBatch(MemoryMode memMode, StructType partitionColumns,
-                        InternalRow partitionValues) {
+  public void initBatch(
+      MemoryMode memMode,
+      StructType partitionColumns,
+      InternalRow partitionValues) {
     StructType batchSchema = new StructType();
     for (StructField f: sparkSchema.fields()) {
       batchSchema = batchSchema.add(f);
@@ -281,11 +283,12 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
           + rowsReturned + " out of " + totalRowCount);
     }
     List<ColumnDescriptor> columns = requestedSchema.getColumns();
+    List<Type> types = requestedSchema.asGroupType().getFields();
     columnReaders = new VectorizedColumnReader[columns.size()];
     for (int i = 0; i < columns.size(); ++i) {
       if (missingColumns[i]) continue;
-      columnReaders[i] = new VectorizedColumnReader(columns.get(i),
-          pages.getPageReader(columns.get(i)));
+      columnReaders[i] = new VectorizedColumnReader(
+        columns.get(i), types.get(i).getOriginalType(), pages.getPageReader(columns.get(i)));
     }
     totalCountLoadedSoFar += pages.getRowCount();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 61bd65d..a48f8d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -111,23 +111,15 @@ class ParquetFileFormat
     // This metadata is only useful for detecting optional columns when pushdowning filters.
     ParquetWriteSupport.setSchema(dataSchema, conf)
 
-    // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
-    // and `CatalystWriteSupport` (writing actual rows to Parquet files).
-    conf.set(
-      SQLConf.PARQUET_BINARY_AS_STRING.key,
-      sparkSession.sessionState.conf.isParquetBinaryAsString.toString)
-
-    conf.set(
-      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-      sparkSession.sessionState.conf.isParquetINT96AsTimestamp.toString)
-
+    // Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet
+    // schema and writes actual rows to Parquet files.
     conf.set(
       SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
       sparkSession.sessionState.conf.writeLegacyParquetFormat.toString)
 
     conf.set(
-      SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key,
-      sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis.toString)
+      SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
+      sparkSession.sessionState.conf.parquetOutputTimestampType.toString)
 
     // Sets compression scheme
     conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
@@ -312,16 +304,13 @@ class ParquetFileFormat
 
     ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
 
-    // Sets flags for `CatalystSchemaConverter`
+    // Sets flags for `ParquetToSparkSchemaConverter`
     hadoopConf.setBoolean(
       SQLConf.PARQUET_BINARY_AS_STRING.key,
       sparkSession.sessionState.conf.isParquetBinaryAsString)
     hadoopConf.setBoolean(
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
-    hadoopConf.setBoolean(
-      SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key,
-      sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis)
 
     // Try to push down filters when filter push-down is enabled.
     val pushed =
@@ -428,15 +417,9 @@ object ParquetFileFormat extends Logging {
   private[parquet] def readSchema(
       footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
 
-    def parseParquetSchema(schema: MessageType): StructType = {
-      val converter = new ParquetSchemaConverter(
-        sparkSession.sessionState.conf.isParquetBinaryAsString,
-        sparkSession.sessionState.conf.isParquetBinaryAsString,
-        sparkSession.sessionState.conf.writeLegacyParquetFormat,
-        sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis)
-
-      converter.convert(schema)
-    }
+    val converter = new ParquetToSparkSchemaConverter(
+      sparkSession.sessionState.conf.isParquetBinaryAsString,
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
 
     val seen = mutable.HashSet[String]()
     val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
@@ -447,7 +430,7 @@ object ParquetFileFormat extends Logging {
         .get(ParquetReadSupport.SPARK_METADATA_KEY)
       if (serializedSchema.isEmpty) {
         // Falls back to Parquet schema if no Spark SQL schema found.
-        Some(parseParquetSchema(metadata.getSchema))
+        Some(converter.convert(metadata.getSchema))
       } else if (!seen.contains(serializedSchema.get)) {
         seen += serializedSchema.get
 
@@ -470,7 +453,7 @@ object ParquetFileFormat extends Logging {
           .map(_.asInstanceOf[StructType])
           .getOrElse {
             // Falls back to Parquet schema if Spark SQL schema can't be parsed.
-            parseParquetSchema(metadata.getSchema)
+            converter.convert(metadata.getSchema)
           })
       } else {
         None
@@ -538,8 +521,6 @@ object ParquetFileFormat extends Logging {
       sparkSession: SparkSession): Option[StructType] = {
     val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
     val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
-    val writeTimestampInMillis = sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis
-    val writeLegacyParquetFormat = sparkSession.sessionState.conf.writeLegacyParquetFormat
     val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
 
     // !! HACK ALERT !!
@@ -579,13 +560,9 @@ object ParquetFileFormat extends Logging {
               serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
 
           // Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
-          val converter =
-            new ParquetSchemaConverter(
-              assumeBinaryIsString = assumeBinaryIsString,
-              assumeInt96IsTimestamp = assumeInt96IsTimestamp,
-              writeLegacyParquetFormat = writeLegacyParquetFormat,
-              writeTimestampInMillis = writeTimestampInMillis)
-
+          val converter = new ParquetToSparkSchemaConverter(
+            assumeBinaryIsString = assumeBinaryIsString,
+            assumeInt96IsTimestamp = assumeInt96IsTimestamp)
           if (footers.isEmpty) {
             Iterator.empty
           } else {
@@ -625,7 +602,7 @@ object ParquetFileFormat extends Logging {
    * a [[StructType]] converted from the [[MessageType]] stored in this footer.
    */
   def readSchemaFromFooter(
-      footer: Footer, converter: ParquetSchemaConverter): StructType = {
+      footer: Footer, converter: ParquetToSparkSchemaConverter): StructType = {
     val fileMetaData = footer.getParquetMetadata.getFileMetaData
     fileMetaData
       .getKeyValueMetaData

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index f1a35dd..2854cb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -95,7 +95,7 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo
     new ParquetRecordMaterializer(
       parquetRequestedSchema,
       ParquetReadSupport.expandUDT(catalystRequestedSchema),
-      new ParquetSchemaConverter(conf))
+      new ParquetToSparkSchemaConverter(conf))
   }
 }
 
@@ -270,7 +270,7 @@ private[parquet] object ParquetReadSupport {
   private def clipParquetGroupFields(
       parquetRecord: GroupType, structType: StructType): Seq[Type] = {
     val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
-    val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false)
+    val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
     structType.map { f =>
       parquetFieldMap
         .get(f.name)

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index 4e49a0d..793755e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -31,7 +31,9 @@ import org.apache.spark.sql.types.StructType
  * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
  */
 private[parquet] class ParquetRecordMaterializer(
-    parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter)
+    parquetSchema: MessageType,
+    catalystSchema: StructType,
+    schemaConverter: ParquetToSparkSchemaConverter)
   extends RecordMaterializer[UnsafeRow] {
 
   private val rootConverter =

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 32e6c60..10f6c3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -27,7 +27,7 @@ import org.apache.parquet.column.Dictionary
 import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
 import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type}
 import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64, INT96}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
@@ -120,7 +120,7 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
  * @param updater An updater which propagates converted field values to the parent container
  */
 private[parquet] class ParquetRowConverter(
-    schemaConverter: ParquetSchemaConverter,
+    schemaConverter: ParquetToSparkSchemaConverter,
     parquetType: GroupType,
     catalystType: StructType,
     updater: ParentContainerUpdater)
@@ -252,6 +252,13 @@ private[parquet] class ParquetRowConverter(
       case StringType =>
         new ParquetStringConverter(updater)
 
+      case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS =>
+        new ParquetPrimitiveConverter(updater) {
+          override def addLong(value: Long): Unit = {
+            updater.setLong(value)
+          }
+        }
+
       case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
         new ParquetPrimitiveConverter(updater) {
           override def addLong(value: Long): Unit = {
@@ -259,8 +266,8 @@ private[parquet] class ParquetRowConverter(
           }
         }
 
-      case TimestampType =>
-        // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
+      // INT96 timestamp doesn't have a logical type, here we check the physical type instead.
+      case TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 =>
         new ParquetPrimitiveConverter(updater) {
           // Converts nanosecond timestamps stored as INT96
           override def addBinary(value: Binary): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index cd384d1..c61be07 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -30,49 +30,31 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
+
 /**
- * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]] and
- * vice versa.
+ * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]].
  *
  * Parquet format backwards-compatibility rules are respected when converting Parquet
  * [[MessageType]] schemas.
  *
  * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
- * @constructor
+ *
  * @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL
- *        [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL
- *        [[StructType]].  This argument only affects Parquet read path.
+ *        [[StringType]] fields.
  * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
- *        [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL
- *        [[StructType]].  Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which
- *        has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS`
- *        described in Parquet format spec.  This argument only affects Parquet read path.
- * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4
- *        and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
- *        When set to false, use standard format defined in parquet-format spec.  This argument only
- *        affects Parquet write path.
- * @param writeTimestampInMillis Whether to write timestamp values as INT64 annotated by logical
- *        type TIMESTAMP_MILLIS.
- *
+ *        [[TimestampType]] fields.
  */
-private[parquet] class ParquetSchemaConverter(
+class ParquetToSparkSchemaConverter(
     assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
-    assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
-    writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
-    writeTimestampInMillis: Boolean = SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.defaultValue.get) {
+    assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) {
 
   def this(conf: SQLConf) = this(
     assumeBinaryIsString = conf.isParquetBinaryAsString,
-    assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
-    writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
-    writeTimestampInMillis = conf.isParquetINT64AsTimestampMillis)
+    assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp)
 
   def this(conf: Configuration) = this(
     assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
-    assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
-    writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
-      SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean,
-    writeTimestampInMillis = conf.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean)
+    assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean)
 
 
   /**
@@ -165,6 +147,7 @@ private[parquet] class ParquetSchemaConverter(
           case INT_64 | null => LongType
           case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
           case UINT_64 => typeNotSupported()
+          case TIMESTAMP_MICROS => TimestampType
           case TIMESTAMP_MILLIS => TimestampType
           case _ => illegalType()
         }
@@ -310,6 +293,30 @@ private[parquet] class ParquetSchemaConverter(
       repeatedType.getName == s"${parentName}_tuple"
     }
   }
+}
+
+/**
+ * This converter class is used to convert Spark SQL [[StructType]] to Parquet [[MessageType]].
+ *
+ * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4
+ *        and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
+ *        When set to false, use standard format defined in parquet-format spec.  This argument only
+ *        affects Parquet write path.
+ * @param outputTimestampType which parquet timestamp type to use when writing.
+ */
+class SparkToParquetSchemaConverter(
+    writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
+    outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
+      SQLConf.ParquetOutputTimestampType.INT96) {
+
+  def this(conf: SQLConf) = this(
+    writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
+    outputTimestampType = conf.parquetOutputTimestampType)
+
+  def this(conf: Configuration) = this(
+    writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean,
+    outputTimestampType = SQLConf.ParquetOutputTimestampType.withName(
+      conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)))
 
   /**
    * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
@@ -363,7 +370,9 @@ private[parquet] class ParquetSchemaConverter(
       case DateType =>
         Types.primitive(INT32, repetition).as(DATE).named(field.name)
 
-      // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec.
+      // NOTE: Spark SQL can write timestamp values to Parquet using INT96, TIMESTAMP_MICROS or
+      // TIMESTAMP_MILLIS. TIMESTAMP_MICROS is recommended but INT96 is the default to keep the
+      // behavior same as before.
       //
       // As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond
       // timestamp in Impala for some historical reasons.  It's not recommended to be used for any
@@ -372,23 +381,18 @@ private[parquet] class ParquetSchemaConverter(
       // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
       //
       // Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive.  Starting
-      // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
-      // a timestamp into a `Long`.  This design decision is subject to change though, for example,
-      // we may resort to microsecond precision in the future.
-      //
-      // For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's
-      // currently not implemented yet because parquet-mr 1.8.1 (the version we're currently using)
-      // hasn't implemented `TIMESTAMP_MICROS` yet, however it supports TIMESTAMP_MILLIS. We will
-      // encode timestamp values as TIMESTAMP_MILLIS annotating INT64 if
-      // 'spark.sql.parquet.int64AsTimestampMillis' is set.
-      //
-      // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
-
-      case TimestampType if writeTimestampInMillis =>
-        Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name)
-
+      // from Spark 1.5.0, we resort to a timestamp type with microsecond precision so that we can
+      // store a timestamp into a `Long`.  This design decision is subject to change though, for
+      // example, we may resort to nanosecond precision in the future.
       case TimestampType =>
-        Types.primitive(INT96, repetition).named(field.name)
+        outputTimestampType match {
+          case SQLConf.ParquetOutputTimestampType.INT96 =>
+            Types.primitive(INT96, repetition).named(field.name)
+          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
+            Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name)
+          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
+            Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name)
+        }
 
       case BinaryType =>
         Types.primitive(BINARY, repetition).named(field.name)

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 63a8666..af4e143 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -66,8 +66,8 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
   // Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions
   private var writeLegacyParquetFormat: Boolean = _
 
-  // Whether to write timestamp value with milliseconds precision.
-  private var writeTimestampInMillis: Boolean = _
+  // Which parquet timestamp type to use when writing.
+  private var outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = _
 
   // Reusable byte array used to write timestamps as Parquet INT96 values
   private val timestampBuffer = new Array[Byte](12)
@@ -84,15 +84,15 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
       configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
     }
 
-    this.writeTimestampInMillis = {
-      assert(configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key) != null)
-      configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean
+    this.outputTimestampType = {
+      val key = SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key
+      assert(configuration.get(key) != null)
+      SQLConf.ParquetOutputTimestampType.withName(configuration.get(key))
     }
 
-
     this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray[ValueWriter]
 
-    val messageType = new ParquetSchemaConverter(configuration).convert(schema)
+    val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema)
     val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
 
     logInfo(
@@ -163,25 +163,23 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
           recordConsumer.addBinary(
             Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))
 
-      case TimestampType if writeTimestampInMillis =>
-        (row: SpecializedGetters, ordinal: Int) =>
-           val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
-           recordConsumer.addLong(millis)
-
       case TimestampType =>
-        (row: SpecializedGetters, ordinal: Int) => {
-          // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it
-          // Currently we only support timestamps stored as INT96, which is compatible with Hive
-          // and Impala.  However, INT96 is to be deprecated.  We plan to support `TIMESTAMP_MICROS`
-          // defined in the parquet-format spec.  But up until writing, the most recent parquet-mr
-          // version (1.8.1) hasn't implemented it yet.
-
-          // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
-          // precision.  Nanosecond parts of timestamp values read from INT96 are simply stripped.
-          val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
-          val buf = ByteBuffer.wrap(timestampBuffer)
-          buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
-          recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
+        outputTimestampType match {
+          case SQLConf.ParquetOutputTimestampType.INT96 =>
+            (row: SpecializedGetters, ordinal: Int) =>
+              val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
+              val buf = ByteBuffer.wrap(timestampBuffer)
+              buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
+              recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
+
+          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
+            (row: SpecializedGetters, ordinal: Int) =>
+              recordConsumer.addLong(row.getLong(ordinal))
+
+          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
+            (row: SpecializedGetters, ordinal: Int) =>
+              val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
+              recordConsumer.addLong(millis)
         }
 
       case BinaryType =>

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index d76990b..633cfde 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -110,12 +110,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
         |  required binary h(DECIMAL(32,0));
         |  required fixed_len_byte_array(32) i(DECIMAL(32,0));
         |  required int64 j(TIMESTAMP_MILLIS);
+        |  required int64 k(TIMESTAMP_MICROS);
         |}
       """.stripMargin)
 
     val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0),
       DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0),
-      TimestampType)
+      TimestampType, TimestampType)
 
     withTempPath { location =>
       val path = new Path(location.getCanonicalPath)
@@ -380,7 +381,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
       assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
 
-      val expectedSchema = new ParquetSchemaConverter().convert(schema)
+      val expectedSchema = new SparkToParquetSchemaConverter().convert(schema)
       val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema
 
       actualSchema.checkContains(expectedSchema)

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index e822e40..4c8c9ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -235,6 +235,28 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     }
   }
 
+  test("SPARK-10365 timestamp written and read as INT64 - TIMESTAMP_MICROS") {
+    val data = (1 to 10).map { i =>
+      val ts = new java.sql.Timestamp(i)
+      ts.setNanos(2000)
+      Row(i, ts)
+    }
+    val schema = StructType(List(StructField("d", IntegerType, false),
+      StructField("time", TimestampType, false)).toArray)
+    withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MICROS") {
+      withTempPath { file =>
+        val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+        df.write.parquet(file.getCanonicalPath)
+        ("true" :: "false" :: Nil).foreach { vectorized =>
+          withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+            val df2 = spark.read.parquet(file.getCanonicalPath)
+            checkAnswer(df2, df.collect().toSeq)
+          }
+        }
+      }
+    }
+  }
+
   test("Enabling/disabling merging partfiles when merging parquet schema") {
     def testSchemaMerging(expectedColumnNumber: Int): Unit = {
       withTempDir { dir =>

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index ce99267..2cd2a60 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -24,6 +24,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 
@@ -52,14 +53,10 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
       sqlSchema: StructType,
       parquetSchema: String,
       binaryAsString: Boolean,
-      int96AsTimestamp: Boolean,
-      writeLegacyParquetFormat: Boolean,
-      int64AsTimestampMillis: Boolean = false): Unit = {
-    val converter = new ParquetSchemaConverter(
+      int96AsTimestamp: Boolean): Unit = {
+    val converter = new ParquetToSparkSchemaConverter(
       assumeBinaryIsString = binaryAsString,
-      assumeInt96IsTimestamp = int96AsTimestamp,
-      writeLegacyParquetFormat = writeLegacyParquetFormat,
-      writeTimestampInMillis = int64AsTimestampMillis)
+      assumeInt96IsTimestamp = int96AsTimestamp)
 
     test(s"sql <= parquet: $testName") {
       val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema))
@@ -77,15 +74,12 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
       testName: String,
       sqlSchema: StructType,
       parquetSchema: String,
-      binaryAsString: Boolean,
-      int96AsTimestamp: Boolean,
       writeLegacyParquetFormat: Boolean,
-      int64AsTimestampMillis: Boolean = false): Unit = {
-    val converter = new ParquetSchemaConverter(
-      assumeBinaryIsString = binaryAsString,
-      assumeInt96IsTimestamp = int96AsTimestamp,
+      outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
+        SQLConf.ParquetOutputTimestampType.INT96): Unit = {
+    val converter = new SparkToParquetSchemaConverter(
       writeLegacyParquetFormat = writeLegacyParquetFormat,
-      writeTimestampInMillis = int64AsTimestampMillis)
+      outputTimestampType = outputTimestampType)
 
     test(s"sql => parquet: $testName") {
       val actual = converter.convert(sqlSchema)
@@ -102,25 +96,22 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
       binaryAsString: Boolean,
       int96AsTimestamp: Boolean,
       writeLegacyParquetFormat: Boolean,
-      int64AsTimestampMillis: Boolean = false): Unit = {
+      outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
+        SQLConf.ParquetOutputTimestampType.INT96): Unit = {
 
     testCatalystToParquet(
       testName,
       sqlSchema,
       parquetSchema,
-      binaryAsString,
-      int96AsTimestamp,
       writeLegacyParquetFormat,
-      int64AsTimestampMillis)
+      outputTimestampType)
 
     testParquetToCatalyst(
       testName,
       sqlSchema,
       parquetSchema,
       binaryAsString,
-      int96AsTimestamp,
-      writeLegacyParquetFormat,
-      int64AsTimestampMillis)
+      int96AsTimestamp)
   }
 }
 
@@ -411,8 +402,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: LIST with nullable element type - 2",
@@ -430,8 +420,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: LIST with non-nullable element type - 1 - standard",
@@ -446,8 +435,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: LIST with non-nullable element type - 2",
@@ -462,8 +450,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: LIST with non-nullable element type - 3",
@@ -476,8 +463,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: LIST with non-nullable element type - 4",
@@ -500,8 +486,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: LIST with non-nullable element type - 5 - parquet-avro style",
@@ -522,8 +507,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: LIST with non-nullable element type - 6 - parquet-thrift style",
@@ -544,8 +528,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: LIST with non-nullable element type 7 - " +
@@ -557,8 +540,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: LIST with non-nullable element type 8 - " +
@@ -580,8 +562,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   // =======================================================
   // Tests for converting Catalyst ArrayType to Parquet LIST
@@ -602,8 +583,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |  }
       |}
     """.stripMargin,
-    binaryAsString = true,
-    int96AsTimestamp = true,
     writeLegacyParquetFormat = false)
 
   testCatalystToParquet(
@@ -621,8 +600,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |  }
       |}
     """.stripMargin,
-    binaryAsString = true,
-    int96AsTimestamp = true,
     writeLegacyParquetFormat = true)
 
   testCatalystToParquet(
@@ -640,8 +617,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |  }
       |}
     """.stripMargin,
-    binaryAsString = true,
-    int96AsTimestamp = true,
     writeLegacyParquetFormat = false)
 
   testCatalystToParquet(
@@ -657,8 +632,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |  }
       |}
     """.stripMargin,
-    binaryAsString = true,
-    int96AsTimestamp = true,
     writeLegacyParquetFormat = true)
 
   // ====================================================
@@ -682,8 +655,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: MAP with non-nullable value type - 2",
@@ -702,8 +674,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x",
@@ -722,8 +693,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: MAP with nullable value type - 1 - standard",
@@ -742,8 +712,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: MAP with nullable value type - 2",
@@ -762,8 +731,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   testParquetToCatalyst(
     "Backwards-compatibility: MAP with nullable value type - 3 - parquet-avro style",
@@ -782,8 +750,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin,
     binaryAsString = true,
-    int96AsTimestamp = true,
-    writeLegacyParquetFormat = true)
+    int96AsTimestamp = true)
 
   // ====================================================
   // Tests for converting Catalyst MapType to Parquet Map
@@ -805,8 +772,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |  }
       |}
     """.stripMargin,
-    binaryAsString = true,
-    int96AsTimestamp = true,
     writeLegacyParquetFormat = false)
 
   testCatalystToParquet(
@@ -825,8 +790,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |  }
       |}
     """.stripMargin,
-    binaryAsString = true,
-    int96AsTimestamp = true,
     writeLegacyParquetFormat = true)
 
   testCatalystToParquet(
@@ -845,8 +808,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |  }
       |}
     """.stripMargin,
-    binaryAsString = true,
-    int96AsTimestamp = true,
     writeLegacyParquetFormat = false)
 
   testCatalystToParquet(
@@ -865,8 +826,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |  }
       |}
     """.stripMargin,
-    binaryAsString = true,
-    int96AsTimestamp = true,
     writeLegacyParquetFormat = true)
 
   // =================================
@@ -982,7 +941,19 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     binaryAsString = true,
     int96AsTimestamp = false,
     writeLegacyParquetFormat = true,
-    int64AsTimestampMillis = true)
+    outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS)
+
+  testSchema(
+    "Timestamp written and read as INT64 with TIMESTAMP_MICROS",
+    StructType(Seq(StructField("f1", TimestampType))),
+    """message root {
+      |  optional INT64 f1 (TIMESTAMP_MICROS);
+      |}
+    """.stripMargin,
+    binaryAsString = true,
+    int96AsTimestamp = false,
+    writeLegacyParquetFormat = true,
+    outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
 
   private def testSchemaClipping(
       testName: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 85efca3..f05f572 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -124,7 +124,7 @@ private[sql] trait ParquetTest extends SQLTestUtils {
 
   protected def writeMetadata(
       schema: StructType, path: Path, configuration: Configuration): Unit = {
-    val parquetSchema = new ParquetSchemaConverter().convert(schema)
+    val parquetSchema = new SparkToParquetSchemaConverter().convert(schema)
     val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schema.json).asJava
     val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}"
     val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, createdBy)

http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 205c303..f9d75fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -281,4 +281,32 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
     assert(null == spark.conf.get("spark.sql.nonexistent", null))
     assert("<undefined>" == spark.conf.get("spark.sql.nonexistent", "<undefined>"))
   }
+
+  test("SPARK-10365: PARQUET_OUTPUT_TIMESTAMP_TYPE") {
+    spark.sessionState.conf.clear()
+
+    // check default value
+    assert(spark.sessionState.conf.parquetOutputTimestampType ==
+      SQLConf.ParquetOutputTimestampType.INT96)
+
+    // PARQUET_INT64_AS_TIMESTAMP_MILLIS should be respected.
+    spark.sessionState.conf.setConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS, true)
+    assert(spark.sessionState.conf.parquetOutputTimestampType ==
+      SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS)
+
+    // PARQUET_OUTPUT_TIMESTAMP_TYPE has higher priority over PARQUET_INT64_AS_TIMESTAMP_MILLIS
+    spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
+    assert(spark.sessionState.conf.parquetOutputTimestampType ==
+      SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
+    spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96")
+    assert(spark.sessionState.conf.parquetOutputTimestampType ==
+      SQLConf.ParquetOutputTimestampType.INT96)
+
+    // test invalid conf value
+    intercept[IllegalArgumentException] {
+      spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "invalid")
+    }
+
+    spark.sessionState.conf.clear()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org