You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xinli Shang (JIRA)" <ji...@apache.org> on 2018/10/26 22:49:00 UTC

[jira] [Updated] (SPARK-25858) Passing Field Metadata to Parquet

     [ https://issues.apache.org/jira/browse/SPARK-25858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xinli Shang updated SPARK-25858:
--------------------------------
    Description: 
h1. Problem Statement

The Spark WriteSupport class for Parquet is hardcoded to use org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport, which is not configurable. Currently, this class doesn’t carry over the field metadata in StructType to MessageType. However, Parquet column encryption (Parquet-1396, Parquet-1178) requires the field metadata inside MessageType of Parquet, so that the metadata can be used to control column encryption.
h1. Technical Solution
 # Extend SparkToParquetSchemaConverter class and override convert() method to add the functionality of carrying over the field metadata
 # Extend ParquetWriteSupport and use the extended converter in #1. The extension avoids changing the built-in WriteSupport to mitigate the risk.
 # Change Spark code to make the WriteSupport class configurable to let the user configure to use the extended WriteSupport in #2.  The default WriteSupport is still org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.

h1. Technical Details
h2. Extend SparkToParquetSchemaConverter class

 *SparkToParquetMetadataSchemaConverter* extends SparkToParquetSchemaConverter {

 

      *override* def convert(catalystSchema: StructType): MessageType = {

        Types   

               ._buildMessage_()   

               .addFields(catalystSchema.map(*convertFieldWithMetadata*): _*)

                .named(ParquetSchemaConverter._SPARK_PARQUET_SCHEMA_NAME_)        

      }

 

      private def *convertFieldWithMetadata*(field: StructField) : Type = {

              val extField  = new ExtType[Any](convertField(field))

              val metaBuilder = new MetadataBuilder().withMetadata(field.metadata)

              val metaData = metaBuilder.getMap

             extField.setMetadata(metaData)

             return extField      

      }

 }
h2. Extend ParquetWriteSupport

class CryptoParquetWriteSupport extends ParquetWriteSupport {

  *override* def init(configuration: Configuration): WriteContext = {       

       val converter = new *SparkToParquetMetadataSchemaConverter*(configuration)

      createContext(configuration, converter)

   }

}
h2. Make WriteSupport configurable

class ParquetFileFormat{

 

   **    override def prepareWrite(...) {

       …

       *if (conf.get(ParquetOutputFormat.**_WRITE_SUPPORT_CLASS_**) == null) {*

           ParquetOutputFormat._setWriteSupportClass_(job, _classOf_[ParquetWriteSupport])

       ** 

      ...

   }

}
h1. Verification

The [ParquetHelloWorld.java|https://github.com/shangxinli/parquet-writesupport-extensions/blob/master/src/main/java/com/uber/ParquetHelloWorld.java] in the github repository [parquet-writesupport-extensions|https://github.com/shangxinli/parquet-writesupport-extensions] has a sample verification of passing down the field metadata and perform column encryption.
h1. Dependency
 * Parquet-1178
 * Parquet-1396
 * Parquet-1397

  was:
h1. Problem Statement

The Spark WriteSupport class for Parquet is hardcoded to use org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport, which is not configurable. Currently, this class doesn’t carry over the field metadata in StructType to MessageType. However, Parquet column encryption (Parquet-1396, Parquet-1178) requires the field metadata inside MessageType of Parquet, so that the metadata can be used to control column encryption. 
h1. Technical Solution 
 # Extend SparkToParquetSchemaConverter class and override convert() method to add the functionality of carrying over the field metadata
 # Extend ParquetWriteSupport and use the extended converter in #1. The extension avoids changing the built-in WriteSupport to mitigate the risk.
 # Change Spark code to make the WriteSupport class configurable to let the user configure to use the extended WriteSupport in #2.  The default WriteSupport is still org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport. 

h1. Technical Details 
h2. Extend SparkToParquetSchemaConverter class 

  *SparkToParquetMetadataSchemaConverter* extends SparkToParquetSchemaConverter {

 

       *override* def convert(catalystSchema: StructType): MessageType = {

 Types

   ._buildMessage_()

   .addFields(catalystSchema.map(*convertFieldWithMetadata*): _*)

   .named(ParquetSchemaConverter._SPARK_PARQUET_SCHEMA_NAME_)

        }

 

       private def *convertFieldWithMetadata*(field: StructField) : Type = {

 val extField  = new ExtType[Any](convertField(field))

 val metaBuilder = new MetadataBuilder().withMetadata(field.metadata)

 val metaData = metaBuilder.getMap

 extField.setMetadata(metaData)

 return extField

       }

  }
h2. Extend ParquetWriteSupport

class CryptoParquetWriteSupport extends ParquetWriteSupport {

 

   *override* def init(configuration: Configuration): WriteContext = {

       val converter = new *SparkToParquetMetadataSchemaConverter*(configuration)

       createContext(configuration, converter)

   }

}
h2. Make WriteSupport configurable

class ParquetFileFormat{

 

    **    override def prepareWrite(...) {

        …

        *if (conf.get(ParquetOutputFormat.**_WRITE_SUPPORT_CLASS_**) == null) {*

            ParquetOutputFormat._setWriteSupportClass_(job, _classOf_[ParquetWriteSupport])

        ** 

       ...

    }

}
h1. Verification 

The [ParquetHelloWorld.java|https://github.com/shangxinli/parquet-writesupport-extensions/blob/master/src/main/java/com/uber/ParquetHelloWorld.java] in the github repository [parquet-writesupport-extensions|https://github.com/shangxinli/parquet-writesupport-extensions] has a sample verification of passing down the field metadata and perform column encryption. 
h1. Dependency
 * Parquet-1178
 * Parquet-1396
 * Parquet-1397


> Passing Field Metadata to Parquet
> ---------------------------------
>
>                 Key: SPARK-25858
>                 URL: https://issues.apache.org/jira/browse/SPARK-25858
>             Project: Spark
>          Issue Type: New Feature
>          Components: Input/Output
>    Affects Versions: 2.3.2
>            Reporter: Xinli Shang
>            Priority: Major
>
> h1. Problem Statement
> The Spark WriteSupport class for Parquet is hardcoded to use org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport, which is not configurable. Currently, this class doesn’t carry over the field metadata in StructType to MessageType. However, Parquet column encryption (Parquet-1396, Parquet-1178) requires the field metadata inside MessageType of Parquet, so that the metadata can be used to control column encryption.
> h1. Technical Solution
>  # Extend SparkToParquetSchemaConverter class and override convert() method to add the functionality of carrying over the field metadata
>  # Extend ParquetWriteSupport and use the extended converter in #1. The extension avoids changing the built-in WriteSupport to mitigate the risk.
>  # Change Spark code to make the WriteSupport class configurable to let the user configure to use the extended WriteSupport in #2.  The default WriteSupport is still org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.
> h1. Technical Details
> h2. Extend SparkToParquetSchemaConverter class
>  *SparkToParquetMetadataSchemaConverter* extends SparkToParquetSchemaConverter {
>  
>       *override* def convert(catalystSchema: StructType): MessageType = {
>         Types   
>                ._buildMessage_()   
>                .addFields(catalystSchema.map(*convertFieldWithMetadata*): _*)
>                 .named(ParquetSchemaConverter._SPARK_PARQUET_SCHEMA_NAME_)        
>       }
>  
>       private def *convertFieldWithMetadata*(field: StructField) : Type = {
>               val extField  = new ExtType[Any](convertField(field))
>               val metaBuilder = new MetadataBuilder().withMetadata(field.metadata)
>               val metaData = metaBuilder.getMap
>              extField.setMetadata(metaData)
>              return extField      
>       }
>  }
> h2. Extend ParquetWriteSupport
> class CryptoParquetWriteSupport extends ParquetWriteSupport {
>   *override* def init(configuration: Configuration): WriteContext = {       
>        val converter = new *SparkToParquetMetadataSchemaConverter*(configuration)
>       createContext(configuration, converter)
>    }
> }
> h2. Make WriteSupport configurable
> class ParquetFileFormat{
>  
>    **    override def prepareWrite(...) {
>        …
>        *if (conf.get(ParquetOutputFormat.**_WRITE_SUPPORT_CLASS_**) == null) {*
>            ParquetOutputFormat._setWriteSupportClass_(job, _classOf_[ParquetWriteSupport])
>        ** 
>       ...
>    }
> }
> h1. Verification
> The [ParquetHelloWorld.java|https://github.com/shangxinli/parquet-writesupport-extensions/blob/master/src/main/java/com/uber/ParquetHelloWorld.java] in the github repository [parquet-writesupport-extensions|https://github.com/shangxinli/parquet-writesupport-extensions] has a sample verification of passing down the field metadata and perform column encryption.
> h1. Dependency
>  * Parquet-1178
>  * Parquet-1396
>  * Parquet-1397



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org