You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/11 21:40:45 UTC
spark git commit: [SPARK-10365][SQL] Support Parquet logical type
TIMESTAMP_MICROS
Repository: spark
Updated Branches:
refs/heads/master d6ee69e77 -> 21a7bfd5c
[SPARK-10365][SQL] Support Parquet logical type TIMESTAMP_MICROS
## What changes were proposed in this pull request?
This PR makes Spark to be able to read Parquet TIMESTAMP_MICROS values, and add a new config to allow Spark to write timestamp values to parquet as TIMESTAMP_MICROS type.
## How was this patch tested?
new test
Author: Wenchen Fan <we...@databricks.com>
Closes #19702 from cloud-fan/parquet.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21a7bfd5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21a7bfd5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21a7bfd5
Branch: refs/heads/master
Commit: 21a7bfd5c324e6c82152229f1394f26afeae771c
Parents: d6ee69e
Author: Wenchen Fan <we...@databricks.com>
Authored: Sat Nov 11 22:40:26 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Nov 11 22:40:26 2017 +0100
----------------------------------------------------------------------
.../org/apache/spark/sql/internal/SQLConf.scala | 30 ++++-
.../SpecificParquetRecordReaderBase.java | 4 +-
.../parquet/VectorizedColumnReader.java | 26 +++--
.../parquet/VectorizedParquetRecordReader.java | 11 +-
.../datasources/parquet/ParquetFileFormat.scala | 51 +++------
.../parquet/ParquetReadSupport.scala | 4 +-
.../parquet/ParquetRecordMaterializer.scala | 4 +-
.../parquet/ParquetRowConverter.scala | 15 ++-
.../parquet/ParquetSchemaConverter.scala | 92 +++++++--------
.../parquet/ParquetWriteSupport.scala | 48 ++++----
.../datasources/parquet/ParquetIOSuite.scala | 5 +-
.../datasources/parquet/ParquetQuerySuite.scala | 22 ++++
.../parquet/ParquetSchemaSuite.scala | 111 +++++++------------
.../datasources/parquet/ParquetTest.scala | 2 +-
.../spark/sql/internal/SQLConfSuite.scala | 28 +++++
15 files changed, 249 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a04f877..831ef62 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -285,8 +285,24 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ object ParquetOutputTimestampType extends Enumeration {
+ val INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS = Value
+ }
+
+ val PARQUET_OUTPUT_TIMESTAMP_TYPE = buildConf("spark.sql.parquet.outputTimestampType")
+ .doc("Sets which Parquet timestamp type to use when Spark writes data to Parquet files. " +
+ "INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS " +
+ "is a standard timestamp type in Parquet, which stores number of microseconds from the " +
+ "Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which " +
+ "means Spark has to truncate the microsecond portion of its timestamp value.")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(ParquetOutputTimestampType.values.map(_.toString))
+ .createWithDefault(ParquetOutputTimestampType.INT96.toString)
+
val PARQUET_INT64_AS_TIMESTAMP_MILLIS = buildConf("spark.sql.parquet.int64AsTimestampMillis")
- .doc("When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the " +
+ .doc(s"(Deprecated since Spark 2.3, please set ${PARQUET_OUTPUT_TIMESTAMP_TYPE.key}.) " +
+ "When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the " +
"extended type. In this mode, the microsecond portion of the timestamp value will be" +
"truncated.")
.booleanConf
@@ -1143,6 +1159,18 @@ class SQLConf extends Serializable with Logging {
def isParquetINT64AsTimestampMillis: Boolean = getConf(PARQUET_INT64_AS_TIMESTAMP_MILLIS)
+ def parquetOutputTimestampType: ParquetOutputTimestampType.Value = {
+ val isOutputTimestampTypeSet = settings.containsKey(PARQUET_OUTPUT_TIMESTAMP_TYPE.key)
+ if (!isOutputTimestampTypeSet && isParquetINT64AsTimestampMillis) {
+ // If PARQUET_OUTPUT_TIMESTAMP_TYPE is not set and PARQUET_INT64_AS_TIMESTAMP_MILLIS is set,
+ // respect PARQUET_INT64_AS_TIMESTAMP_MILLIS and use TIMESTAMP_MILLIS. Otherwise,
+ // PARQUET_OUTPUT_TIMESTAMP_TYPE has higher priority.
+ ParquetOutputTimestampType.TIMESTAMP_MILLIS
+ } else {
+ ParquetOutputTimestampType.withName(getConf(PARQUET_OUTPUT_TIMESTAMP_TYPE))
+ }
+ }
+
def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 5a810ca..80c2f49 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -197,8 +197,6 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
Configuration config = new Configuration();
config.set("spark.sql.parquet.binaryAsString", "false");
config.set("spark.sql.parquet.int96AsTimestamp", "false");
- config.set("spark.sql.parquet.writeLegacyFormat", "false");
- config.set("spark.sql.parquet.int64AsTimestampMillis", "false");
this.file = new Path(path);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
@@ -224,7 +222,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
this.requestedSchema = ParquetSchemaConverter.EMPTY_MESSAGE();
}
}
- this.sparkSchema = new ParquetSchemaConverter(config).convert(requestedSchema);
+ this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema);
this.reader = new ParquetFileReader(
config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 3c8d766..0f1f470 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -26,6 +26,7 @@ import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.*;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -91,11 +92,15 @@ public class VectorizedColumnReader {
private final PageReader pageReader;
private final ColumnDescriptor descriptor;
+ private final OriginalType originalType;
- public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
- throws IOException {
+ public VectorizedColumnReader(
+ ColumnDescriptor descriptor,
+ OriginalType originalType,
+ PageReader pageReader) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
+ this.originalType = originalType;
this.maxDefLevel = descriptor.getMaxDefinitionLevel();
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
@@ -158,12 +163,12 @@ public class VectorizedColumnReader {
defColumn.readIntegers(
num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- // Timestamp values encoded as INT64 can't be lazily decoded as we need to post process
+ // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post process
// the values to add microseconds precision.
if (column.hasDictionary() || (rowId == 0 &&
(descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
(descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 &&
- column.dataType() != DataTypes.TimestampType) ||
+ originalType != OriginalType.TIMESTAMP_MILLIS) ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) {
@@ -253,21 +258,21 @@ public class VectorizedColumnReader {
case INT64:
if (column.dataType() == DataTypes.LongType ||
- DecimalType.is64BitDecimalType(column.dataType())) {
+ DecimalType.is64BitDecimalType(column.dataType()) ||
+ originalType == OriginalType.TIMESTAMP_MICROS) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
}
}
- } else if (column.dataType() == DataTypes.TimestampType) {
+ } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.putLong(i,
DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i))));
}
}
- }
- else {
+ } else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
break;
@@ -377,10 +382,11 @@ public class VectorizedColumnReader {
private void readLongBatch(int rowId, int num, WritableColumnVector column) {
// This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType ||
- DecimalType.is64BitDecimalType(column.dataType())) {
+ DecimalType.is64BitDecimalType(column.dataType()) ||
+ originalType == OriginalType.TIMESTAMP_MICROS) {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- } else if (column.dataType() == DataTypes.TimestampType) {
+ } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 0cacf0c..e827229 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -165,8 +165,10 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
// Columns 0,1: data columns
// Column 2: partitionValues[0]
// Column 3: partitionValues[1]
- public void initBatch(MemoryMode memMode, StructType partitionColumns,
- InternalRow partitionValues) {
+ public void initBatch(
+ MemoryMode memMode,
+ StructType partitionColumns,
+ InternalRow partitionValues) {
StructType batchSchema = new StructType();
for (StructField f: sparkSchema.fields()) {
batchSchema = batchSchema.add(f);
@@ -281,11 +283,12 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
+ rowsReturned + " out of " + totalRowCount);
}
List<ColumnDescriptor> columns = requestedSchema.getColumns();
+ List<Type> types = requestedSchema.asGroupType().getFields();
columnReaders = new VectorizedColumnReader[columns.size()];
for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
- columnReaders[i] = new VectorizedColumnReader(columns.get(i),
- pages.getPageReader(columns.get(i)));
+ columnReaders[i] = new VectorizedColumnReader(
+ columns.get(i), types.get(i).getOriginalType(), pages.getPageReader(columns.get(i)));
}
totalCountLoadedSoFar += pages.getRowCount();
}
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 61bd65d..a48f8d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -111,23 +111,15 @@ class ParquetFileFormat
// This metadata is only useful for detecting optional columns when pushdowning filters.
ParquetWriteSupport.setSchema(dataSchema, conf)
- // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
- // and `CatalystWriteSupport` (writing actual rows to Parquet files).
- conf.set(
- SQLConf.PARQUET_BINARY_AS_STRING.key,
- sparkSession.sessionState.conf.isParquetBinaryAsString.toString)
-
- conf.set(
- SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
- sparkSession.sessionState.conf.isParquetINT96AsTimestamp.toString)
-
+ // Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet
+ // schema and writes actual rows to Parquet files.
conf.set(
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
sparkSession.sessionState.conf.writeLegacyParquetFormat.toString)
conf.set(
- SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key,
- sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis.toString)
+ SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
+ sparkSession.sessionState.conf.parquetOutputTimestampType.toString)
// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
@@ -312,16 +304,13 @@ class ParquetFileFormat
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
- // Sets flags for `CatalystSchemaConverter`
+ // Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
- hadoopConf.setBoolean(
- SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key,
- sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis)
// Try to push down filters when filter push-down is enabled.
val pushed =
@@ -428,15 +417,9 @@ object ParquetFileFormat extends Logging {
private[parquet] def readSchema(
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
- def parseParquetSchema(schema: MessageType): StructType = {
- val converter = new ParquetSchemaConverter(
- sparkSession.sessionState.conf.isParquetBinaryAsString,
- sparkSession.sessionState.conf.isParquetBinaryAsString,
- sparkSession.sessionState.conf.writeLegacyParquetFormat,
- sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis)
-
- converter.convert(schema)
- }
+ val converter = new ParquetToSparkSchemaConverter(
+ sparkSession.sessionState.conf.isParquetBinaryAsString,
+ sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
val seen = mutable.HashSet[String]()
val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
@@ -447,7 +430,7 @@ object ParquetFileFormat extends Logging {
.get(ParquetReadSupport.SPARK_METADATA_KEY)
if (serializedSchema.isEmpty) {
// Falls back to Parquet schema if no Spark SQL schema found.
- Some(parseParquetSchema(metadata.getSchema))
+ Some(converter.convert(metadata.getSchema))
} else if (!seen.contains(serializedSchema.get)) {
seen += serializedSchema.get
@@ -470,7 +453,7 @@ object ParquetFileFormat extends Logging {
.map(_.asInstanceOf[StructType])
.getOrElse {
// Falls back to Parquet schema if Spark SQL schema can't be parsed.
- parseParquetSchema(metadata.getSchema)
+ converter.convert(metadata.getSchema)
})
} else {
None
@@ -538,8 +521,6 @@ object ParquetFileFormat extends Logging {
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
- val writeTimestampInMillis = sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis
- val writeLegacyParquetFormat = sparkSession.sessionState.conf.writeLegacyParquetFormat
val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
// !! HACK ALERT !!
@@ -579,13 +560,9 @@ object ParquetFileFormat extends Logging {
serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
- val converter =
- new ParquetSchemaConverter(
- assumeBinaryIsString = assumeBinaryIsString,
- assumeInt96IsTimestamp = assumeInt96IsTimestamp,
- writeLegacyParquetFormat = writeLegacyParquetFormat,
- writeTimestampInMillis = writeTimestampInMillis)
-
+ val converter = new ParquetToSparkSchemaConverter(
+ assumeBinaryIsString = assumeBinaryIsString,
+ assumeInt96IsTimestamp = assumeInt96IsTimestamp)
if (footers.isEmpty) {
Iterator.empty
} else {
@@ -625,7 +602,7 @@ object ParquetFileFormat extends Logging {
* a [[StructType]] converted from the [[MessageType]] stored in this footer.
*/
def readSchemaFromFooter(
- footer: Footer, converter: ParquetSchemaConverter): StructType = {
+ footer: Footer, converter: ParquetToSparkSchemaConverter): StructType = {
val fileMetaData = footer.getParquetMetadata.getFileMetaData
fileMetaData
.getKeyValueMetaData
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index f1a35dd..2854cb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -95,7 +95,7 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo
new ParquetRecordMaterializer(
parquetRequestedSchema,
ParquetReadSupport.expandUDT(catalystRequestedSchema),
- new ParquetSchemaConverter(conf))
+ new ParquetToSparkSchemaConverter(conf))
}
}
@@ -270,7 +270,7 @@ private[parquet] object ParquetReadSupport {
private def clipParquetGroupFields(
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
- val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false)
+ val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
structType.map { f =>
parquetFieldMap
.get(f.name)
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index 4e49a0d..793755e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -31,7 +31,9 @@ import org.apache.spark.sql.types.StructType
* @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
*/
private[parquet] class ParquetRecordMaterializer(
- parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter)
+ parquetSchema: MessageType,
+ catalystSchema: StructType,
+ schemaConverter: ParquetToSparkSchemaConverter)
extends RecordMaterializer[UnsafeRow] {
private val rootConverter =
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 32e6c60..10f6c3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -27,7 +27,7 @@ import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type}
import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64, INT96}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
@@ -120,7 +120,7 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
* @param updater An updater which propagates converted field values to the parent container
*/
private[parquet] class ParquetRowConverter(
- schemaConverter: ParquetSchemaConverter,
+ schemaConverter: ParquetToSparkSchemaConverter,
parquetType: GroupType,
catalystType: StructType,
updater: ParentContainerUpdater)
@@ -252,6 +252,13 @@ private[parquet] class ParquetRowConverter(
case StringType =>
new ParquetStringConverter(updater)
+ case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS =>
+ new ParquetPrimitiveConverter(updater) {
+ override def addLong(value: Long): Unit = {
+ updater.setLong(value)
+ }
+ }
+
case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
@@ -259,8 +266,8 @@ private[parquet] class ParquetRowConverter(
}
}
- case TimestampType =>
- // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
+ // INT96 timestamp doesn't have a logical type, here we check the physical type instead.
+ case TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 =>
new ParquetPrimitiveConverter(updater) {
// Converts nanosecond timestamps stored as INT96
override def addBinary(value: Binary): Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index cd384d1..c61be07 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -30,49 +30,31 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+
/**
- * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]] and
- * vice versa.
+ * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]].
*
* Parquet format backwards-compatibility rules are respected when converting Parquet
* [[MessageType]] schemas.
*
* @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
- * @constructor
+ *
* @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL
- * [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL
- * [[StructType]]. This argument only affects Parquet read path.
+ * [[StringType]] fields.
* @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
- * [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL
- * [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which
- * has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS`
- * described in Parquet format spec. This argument only affects Parquet read path.
- * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4
- * and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
- * When set to false, use standard format defined in parquet-format spec. This argument only
- * affects Parquet write path.
- * @param writeTimestampInMillis Whether to write timestamp values as INT64 annotated by logical
- * type TIMESTAMP_MILLIS.
- *
+ * [[TimestampType]] fields.
*/
-private[parquet] class ParquetSchemaConverter(
+class ParquetToSparkSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
- assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
- writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
- writeTimestampInMillis: Boolean = SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.defaultValue.get) {
+ assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) {
def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
- assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
- writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
- writeTimestampInMillis = conf.isParquetINT64AsTimestampMillis)
+ assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp)
def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
- assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
- writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
- SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean,
- writeTimestampInMillis = conf.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean)
+ assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean)
/**
@@ -165,6 +147,7 @@ private[parquet] class ParquetSchemaConverter(
case INT_64 | null => LongType
case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
case UINT_64 => typeNotSupported()
+ case TIMESTAMP_MICROS => TimestampType
case TIMESTAMP_MILLIS => TimestampType
case _ => illegalType()
}
@@ -310,6 +293,30 @@ private[parquet] class ParquetSchemaConverter(
repeatedType.getName == s"${parentName}_tuple"
}
}
+}
+
+/**
+ * This converter class is used to convert Spark SQL [[StructType]] to Parquet [[MessageType]].
+ *
+ * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4
+ * and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
+ * When set to false, use standard format defined in parquet-format spec. This argument only
+ * affects Parquet write path.
+ * @param outputTimestampType which parquet timestamp type to use when writing.
+ */
+class SparkToParquetSchemaConverter(
+ writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
+ outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
+ SQLConf.ParquetOutputTimestampType.INT96) {
+
+ def this(conf: SQLConf) = this(
+ writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
+ outputTimestampType = conf.parquetOutputTimestampType)
+
+ def this(conf: Configuration) = this(
+ writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean,
+ outputTimestampType = SQLConf.ParquetOutputTimestampType.withName(
+ conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)))
/**
* Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
@@ -363,7 +370,9 @@ private[parquet] class ParquetSchemaConverter(
case DateType =>
Types.primitive(INT32, repetition).as(DATE).named(field.name)
- // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec.
+ // NOTE: Spark SQL can write timestamp values to Parquet using INT96, TIMESTAMP_MICROS or
+ // TIMESTAMP_MILLIS. TIMESTAMP_MICROS is recommended but INT96 is the default to keep the
+ // behavior same as before.
//
// As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond
// timestamp in Impala for some historical reasons. It's not recommended to be used for any
@@ -372,23 +381,18 @@ private[parquet] class ParquetSchemaConverter(
// `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
//
// Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting
- // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
- // a timestamp into a `Long`. This design decision is subject to change though, for example,
- // we may resort to microsecond precision in the future.
- //
- // For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's
- // currently not implemented yet because parquet-mr 1.8.1 (the version we're currently using)
- // hasn't implemented `TIMESTAMP_MICROS` yet, however it supports TIMESTAMP_MILLIS. We will
- // encode timestamp values as TIMESTAMP_MILLIS annotating INT64 if
- // 'spark.sql.parquet.int64AsTimestampMillis' is set.
- //
- // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
-
- case TimestampType if writeTimestampInMillis =>
- Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name)
-
+ // from Spark 1.5.0, we resort to a timestamp type with microsecond precision so that we can
+ // store a timestamp into a `Long`. This design decision is subject to change though, for
+ // example, we may resort to nanosecond precision in the future.
case TimestampType =>
- Types.primitive(INT96, repetition).named(field.name)
+ outputTimestampType match {
+ case SQLConf.ParquetOutputTimestampType.INT96 =>
+ Types.primitive(INT96, repetition).named(field.name)
+ case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
+ Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name)
+ case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
+ Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name)
+ }
case BinaryType =>
Types.primitive(BINARY, repetition).named(field.name)
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 63a8666..af4e143 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -66,8 +66,8 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
// Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions
private var writeLegacyParquetFormat: Boolean = _
- // Whether to write timestamp value with milliseconds precision.
- private var writeTimestampInMillis: Boolean = _
+ // Which parquet timestamp type to use when writing.
+ private var outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = _
// Reusable byte array used to write timestamps as Parquet INT96 values
private val timestampBuffer = new Array[Byte](12)
@@ -84,15 +84,15 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
}
- this.writeTimestampInMillis = {
- assert(configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key) != null)
- configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean
+ this.outputTimestampType = {
+ val key = SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key
+ assert(configuration.get(key) != null)
+ SQLConf.ParquetOutputTimestampType.withName(configuration.get(key))
}
-
this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray[ValueWriter]
- val messageType = new ParquetSchemaConverter(configuration).convert(schema)
+ val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema)
val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
logInfo(
@@ -163,25 +163,23 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))
- case TimestampType if writeTimestampInMillis =>
- (row: SpecializedGetters, ordinal: Int) =>
- val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
- recordConsumer.addLong(millis)
-
case TimestampType =>
- (row: SpecializedGetters, ordinal: Int) => {
- // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it
- // Currently we only support timestamps stored as INT96, which is compatible with Hive
- // and Impala. However, INT96 is to be deprecated. We plan to support `TIMESTAMP_MICROS`
- // defined in the parquet-format spec. But up until writing, the most recent parquet-mr
- // version (1.8.1) hasn't implemented it yet.
-
- // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
- // precision. Nanosecond parts of timestamp values read from INT96 are simply stripped.
- val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
- val buf = ByteBuffer.wrap(timestampBuffer)
- buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
- recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
+ outputTimestampType match {
+ case SQLConf.ParquetOutputTimestampType.INT96 =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
+ val buf = ByteBuffer.wrap(timestampBuffer)
+ buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
+ recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
+
+ case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ recordConsumer.addLong(row.getLong(ordinal))
+
+ case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
+ recordConsumer.addLong(millis)
}
case BinaryType =>
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index d76990b..633cfde 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -110,12 +110,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
| required binary h(DECIMAL(32,0));
| required fixed_len_byte_array(32) i(DECIMAL(32,0));
| required int64 j(TIMESTAMP_MILLIS);
+ | required int64 k(TIMESTAMP_MICROS);
|}
""".stripMargin)
val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0),
DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0),
- TimestampType)
+ TimestampType, TimestampType)
withTempPath { location =>
val path = new Path(location.getCanonicalPath)
@@ -380,7 +381,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
- val expectedSchema = new ParquetSchemaConverter().convert(schema)
+ val expectedSchema = new SparkToParquetSchemaConverter().convert(schema)
val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema
actualSchema.checkContains(expectedSchema)
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index e822e40..4c8c9ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -235,6 +235,28 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
+ test("SPARK-10365 timestamp written and read as INT64 - TIMESTAMP_MICROS") {
+ val data = (1 to 10).map { i =>
+ val ts = new java.sql.Timestamp(i)
+ ts.setNanos(2000)
+ Row(i, ts)
+ }
+ val schema = StructType(List(StructField("d", IntegerType, false),
+ StructField("time", TimestampType, false)).toArray)
+ withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MICROS") {
+ withTempPath { file =>
+ val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+ df.write.parquet(file.getCanonicalPath)
+ ("true" :: "false" :: Nil).foreach { vectorized =>
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+ val df2 = spark.read.parquet(file.getCanonicalPath)
+ checkAnswer(df2, df.collect().toSeq)
+ }
+ }
+ }
+ }
+ }
+
test("Enabling/disabling merging partfiles when merging parquet schema") {
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
withTempDir { dir =>
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index ce99267..2cd2a60 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -24,6 +24,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -52,14 +53,10 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
sqlSchema: StructType,
parquetSchema: String,
binaryAsString: Boolean,
- int96AsTimestamp: Boolean,
- writeLegacyParquetFormat: Boolean,
- int64AsTimestampMillis: Boolean = false): Unit = {
- val converter = new ParquetSchemaConverter(
+ int96AsTimestamp: Boolean): Unit = {
+ val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = binaryAsString,
- assumeInt96IsTimestamp = int96AsTimestamp,
- writeLegacyParquetFormat = writeLegacyParquetFormat,
- writeTimestampInMillis = int64AsTimestampMillis)
+ assumeInt96IsTimestamp = int96AsTimestamp)
test(s"sql <= parquet: $testName") {
val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema))
@@ -77,15 +74,12 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
testName: String,
sqlSchema: StructType,
parquetSchema: String,
- binaryAsString: Boolean,
- int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean,
- int64AsTimestampMillis: Boolean = false): Unit = {
- val converter = new ParquetSchemaConverter(
- assumeBinaryIsString = binaryAsString,
- assumeInt96IsTimestamp = int96AsTimestamp,
+ outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
+ SQLConf.ParquetOutputTimestampType.INT96): Unit = {
+ val converter = new SparkToParquetSchemaConverter(
writeLegacyParquetFormat = writeLegacyParquetFormat,
- writeTimestampInMillis = int64AsTimestampMillis)
+ outputTimestampType = outputTimestampType)
test(s"sql => parquet: $testName") {
val actual = converter.convert(sqlSchema)
@@ -102,25 +96,22 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean,
- int64AsTimestampMillis: Boolean = false): Unit = {
+ outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
+ SQLConf.ParquetOutputTimestampType.INT96): Unit = {
testCatalystToParquet(
testName,
sqlSchema,
parquetSchema,
- binaryAsString,
- int96AsTimestamp,
writeLegacyParquetFormat,
- int64AsTimestampMillis)
+ outputTimestampType)
testParquetToCatalyst(
testName,
sqlSchema,
parquetSchema,
binaryAsString,
- int96AsTimestamp,
- writeLegacyParquetFormat,
- int64AsTimestampMillis)
+ int96AsTimestamp)
}
}
@@ -411,8 +402,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with nullable element type - 2",
@@ -430,8 +420,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 1 - standard",
@@ -446,8 +435,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 2",
@@ -462,8 +450,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 3",
@@ -476,8 +463,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 4",
@@ -500,8 +486,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 5 - parquet-avro style",
@@ -522,8 +507,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type - 6 - parquet-thrift style",
@@ -544,8 +528,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type 7 - " +
@@ -557,8 +540,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: LIST with non-nullable element type 8 - " +
@@ -580,8 +562,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
// =======================================================
// Tests for converting Catalyst ArrayType to Parquet LIST
@@ -602,8 +583,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- binaryAsString = true,
- int96AsTimestamp = true,
writeLegacyParquetFormat = false)
testCatalystToParquet(
@@ -621,8 +600,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- binaryAsString = true,
- int96AsTimestamp = true,
writeLegacyParquetFormat = true)
testCatalystToParquet(
@@ -640,8 +617,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- binaryAsString = true,
- int96AsTimestamp = true,
writeLegacyParquetFormat = false)
testCatalystToParquet(
@@ -657,8 +632,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- binaryAsString = true,
- int96AsTimestamp = true,
writeLegacyParquetFormat = true)
// ====================================================
@@ -682,8 +655,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with non-nullable value type - 2",
@@ -702,8 +674,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x",
@@ -722,8 +693,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with nullable value type - 1 - standard",
@@ -742,8 +712,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with nullable value type - 2",
@@ -762,8 +731,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
testParquetToCatalyst(
"Backwards-compatibility: MAP with nullable value type - 3 - parquet-avro style",
@@ -782,8 +750,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin,
binaryAsString = true,
- int96AsTimestamp = true,
- writeLegacyParquetFormat = true)
+ int96AsTimestamp = true)
// ====================================================
// Tests for converting Catalyst MapType to Parquet Map
@@ -805,8 +772,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- binaryAsString = true,
- int96AsTimestamp = true,
writeLegacyParquetFormat = false)
testCatalystToParquet(
@@ -825,8 +790,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- binaryAsString = true,
- int96AsTimestamp = true,
writeLegacyParquetFormat = true)
testCatalystToParquet(
@@ -845,8 +808,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- binaryAsString = true,
- int96AsTimestamp = true,
writeLegacyParquetFormat = false)
testCatalystToParquet(
@@ -865,8 +826,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin,
- binaryAsString = true,
- int96AsTimestamp = true,
writeLegacyParquetFormat = true)
// =================================
@@ -982,7 +941,19 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
binaryAsString = true,
int96AsTimestamp = false,
writeLegacyParquetFormat = true,
- int64AsTimestampMillis = true)
+ outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS)
+
+ testSchema(
+ "Timestamp written and read as INT64 with TIMESTAMP_MICROS",
+ StructType(Seq(StructField("f1", TimestampType))),
+ """message root {
+ | optional INT64 f1 (TIMESTAMP_MICROS);
+ |}
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = false,
+ writeLegacyParquetFormat = true,
+ outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
private def testSchemaClipping(
testName: String,
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 85efca3..f05f572 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -124,7 +124,7 @@ private[sql] trait ParquetTest extends SQLTestUtils {
protected def writeMetadata(
schema: StructType, path: Path, configuration: Configuration): Unit = {
- val parquetSchema = new ParquetSchemaConverter().convert(schema)
+ val parquetSchema = new SparkToParquetSchemaConverter().convert(schema)
val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schema.json).asJava
val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}"
val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, createdBy)
http://git-wip-us.apache.org/repos/asf/spark/blob/21a7bfd5/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 205c303..f9d75fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -281,4 +281,32 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
assert(null == spark.conf.get("spark.sql.nonexistent", null))
assert("<undefined>" == spark.conf.get("spark.sql.nonexistent", "<undefined>"))
}
+
+ test("SPARK-10365: PARQUET_OUTPUT_TIMESTAMP_TYPE") {
+ spark.sessionState.conf.clear()
+
+ // check default value
+ assert(spark.sessionState.conf.parquetOutputTimestampType ==
+ SQLConf.ParquetOutputTimestampType.INT96)
+
+ // PARQUET_INT64_AS_TIMESTAMP_MILLIS should be respected.
+ spark.sessionState.conf.setConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS, true)
+ assert(spark.sessionState.conf.parquetOutputTimestampType ==
+ SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS)
+
+ // PARQUET_OUTPUT_TIMESTAMP_TYPE has higher priority over PARQUET_INT64_AS_TIMESTAMP_MILLIS
+ spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
+ assert(spark.sessionState.conf.parquetOutputTimestampType ==
+ SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
+ spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96")
+ assert(spark.sessionState.conf.parquetOutputTimestampType ==
+ SQLConf.ParquetOutputTimestampType.INT96)
+
+ // test invalid conf value
+ intercept[IllegalArgumentException] {
+ spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "invalid")
+ }
+
+ spark.sessionState.conf.clear()
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org