You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xinli Shang (JIRA)" <ji...@apache.org> on 2018/10/26 22:49:00 UTC
[jira] [Updated] (SPARK-25858) Passing Field Metadata to Parquet
[ https://issues.apache.org/jira/browse/SPARK-25858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xinli Shang updated SPARK-25858:
--------------------------------
Description:
h1. Problem Statement
The Spark WriteSupport class for Parquet is hardcoded to use org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport, which is not configurable. Currently, this class doesn’t carry over the field metadata in StructType to MessageType. However, Parquet column encryption (Parquet-1396, Parquet-1178) requires the field metadata inside MessageType of Parquet, so that the metadata can be used to control column encryption.
h1. Technical Solution
# Extend SparkToParquetSchemaConverter class and override convert() method to add the functionality of carrying over the field metadata
# Extend ParquetWriteSupport and use the extended converter in #1. The extension avoids changing the built-in WriteSupport to mitigate the risk.
# Change Spark code to make the WriteSupport class configurable to let the user configure to use the extended WriteSupport in #2. The default WriteSupport is still org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.
h1. Technical Details
h2. Extend SparkToParquetSchemaConverter class
*SparkToParquetMetadataSchemaConverter* extends SparkToParquetSchemaConverter {
*override* def convert(catalystSchema: StructType): MessageType = {
Types
._buildMessage_()
.addFields(catalystSchema.map(*convertFieldWithMetadata*): _*)
.named(ParquetSchemaConverter._SPARK_PARQUET_SCHEMA_NAME_)
}
private def *convertFieldWithMetadata*(field: StructField) : Type = {
val extField = new ExtType[Any](convertField(field))
val metaBuilder = new MetadataBuilder().withMetadata(field.metadata)
val metaData = metaBuilder.getMap
extField.setMetadata(metaData)
return extField
}
}
h2. Extend ParquetWriteSupport
class CryptoParquetWriteSupport extends ParquetWriteSupport {
*override* def init(configuration: Configuration): WriteContext = {
val converter = new *SparkToParquetMetadataSchemaConverter*(configuration)
createContext(configuration, converter)
}
}
h2. Make WriteSupport configurable
class ParquetFileFormat{
** override def prepareWrite(...) {
…
*if (conf.get(ParquetOutputFormat.**_WRITE_SUPPORT_CLASS_**) == null) {*
ParquetOutputFormat._setWriteSupportClass_(job, _classOf_[ParquetWriteSupport])
**
...
}
}
h1. Verification
The [ParquetHelloWorld.java|https://github.com/shangxinli/parquet-writesupport-extensions/blob/master/src/main/java/com/uber/ParquetHelloWorld.java] in the github repository [parquet-writesupport-extensions|https://github.com/shangxinli/parquet-writesupport-extensions] has a sample verification of passing down the field metadata and perform column encryption.
h1. Dependency
* Parquet-1178
* Parquet-1396
* Parquet-1397
was:
h1. Problem Statement
The Spark WriteSupport class for Parquet is hardcoded to use org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport, which is not configurable. Currently, this class doesn’t carry over the field metadata in StructType to MessageType. However, Parquet column encryption (Parquet-1396, Parquet-1178) requires the field metadata inside MessageType of Parquet, so that the metadata can be used to control column encryption.
h1. Technical Solution
# Extend SparkToParquetSchemaConverter class and override convert() method to add the functionality of carrying over the field metadata
# Extend ParquetWriteSupport and use the extended converter in #1. The extension avoids changing the built-in WriteSupport to mitigate the risk.
# Change Spark code to make the WriteSupport class configurable to let the user configure to use the extended WriteSupport in #2. The default WriteSupport is still org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.
h1. Technical Details
h2. Extend SparkToParquetSchemaConverter class
*SparkToParquetMetadataSchemaConverter* extends SparkToParquetSchemaConverter {
*override* def convert(catalystSchema: StructType): MessageType = {
Types
._buildMessage_()
.addFields(catalystSchema.map(*convertFieldWithMetadata*): _*)
.named(ParquetSchemaConverter._SPARK_PARQUET_SCHEMA_NAME_)
}
private def *convertFieldWithMetadata*(field: StructField) : Type = {
val extField = new ExtType[Any](convertField(field))
val metaBuilder = new MetadataBuilder().withMetadata(field.metadata)
val metaData = metaBuilder.getMap
extField.setMetadata(metaData)
return extField
}
}
h2. Extend ParquetWriteSupport
class CryptoParquetWriteSupport extends ParquetWriteSupport {
*override* def init(configuration: Configuration): WriteContext = {
val converter = new *SparkToParquetMetadataSchemaConverter*(configuration)
createContext(configuration, converter)
}
}
h2. Make WriteSupport configurable
class ParquetFileFormat{
** override def prepareWrite(...) {
…
*if (conf.get(ParquetOutputFormat.**_WRITE_SUPPORT_CLASS_**) == null) {*
ParquetOutputFormat._setWriteSupportClass_(job, _classOf_[ParquetWriteSupport])
**
...
}
}
h1. Verification
The [ParquetHelloWorld.java|https://github.com/shangxinli/parquet-writesupport-extensions/blob/master/src/main/java/com/uber/ParquetHelloWorld.java] in the github repository [parquet-writesupport-extensions|https://github.com/shangxinli/parquet-writesupport-extensions] has a sample verification of passing down the field metadata and perform column encryption.
h1. Dependency
* Parquet-1178
* Parquet-1396
* Parquet-1397
> Passing Field Metadata to Parquet
> ---------------------------------
>
> Key: SPARK-25858
> URL: https://issues.apache.org/jira/browse/SPARK-25858
> Project: Spark
> Issue Type: New Feature
> Components: Input/Output
> Affects Versions: 2.3.2
> Reporter: Xinli Shang
> Priority: Major
>
> h1. Problem Statement
> The Spark WriteSupport class for Parquet is hardcoded to use org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport, which is not configurable. Currently, this class doesn’t carry over the field metadata in StructType to MessageType. However, Parquet column encryption (Parquet-1396, Parquet-1178) requires the field metadata inside MessageType of Parquet, so that the metadata can be used to control column encryption.
> h1. Technical Solution
> # Extend SparkToParquetSchemaConverter class and override convert() method to add the functionality of carrying over the field metadata
> # Extend ParquetWriteSupport and use the extended converter in #1. The extension avoids changing the built-in WriteSupport to mitigate the risk.
> # Change Spark code to make the WriteSupport class configurable to let the user configure to use the extended WriteSupport in #2. The default WriteSupport is still org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.
> h1. Technical Details
> h2. Extend SparkToParquetSchemaConverter class
> *SparkToParquetMetadataSchemaConverter* extends SparkToParquetSchemaConverter {
>
> *override* def convert(catalystSchema: StructType): MessageType = {
> Types
> ._buildMessage_()
> .addFields(catalystSchema.map(*convertFieldWithMetadata*): _*)
> .named(ParquetSchemaConverter._SPARK_PARQUET_SCHEMA_NAME_)
> }
>
> private def *convertFieldWithMetadata*(field: StructField) : Type = {
> val extField = new ExtType[Any](convertField(field))
> val metaBuilder = new MetadataBuilder().withMetadata(field.metadata)
> val metaData = metaBuilder.getMap
> extField.setMetadata(metaData)
> return extField
> }
> }
> h2. Extend ParquetWriteSupport
> class CryptoParquetWriteSupport extends ParquetWriteSupport {
> *override* def init(configuration: Configuration): WriteContext = {
> val converter = new *SparkToParquetMetadataSchemaConverter*(configuration)
> createContext(configuration, converter)
> }
> }
> h2. Make WriteSupport configurable
> class ParquetFileFormat{
>
> ** override def prepareWrite(...) {
> …
> *if (conf.get(ParquetOutputFormat.**_WRITE_SUPPORT_CLASS_**) == null) {*
> ParquetOutputFormat._setWriteSupportClass_(job, _classOf_[ParquetWriteSupport])
> **
> ...
> }
> }
> h1. Verification
> The [ParquetHelloWorld.java|https://github.com/shangxinli/parquet-writesupport-extensions/blob/master/src/main/java/com/uber/ParquetHelloWorld.java] in the github repository [parquet-writesupport-extensions|https://github.com/shangxinli/parquet-writesupport-extensions] has a sample verification of passing down the field metadata and perform column encryption.
> h1. Dependency
> * Parquet-1178
> * Parquet-1396
> * Parquet-1397
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org