You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "Remek Zajac (JIRA)" <ji...@apache.org> on 2016/03/31 10:59:25 UTC
[jira] [Updated] (PARQUET-577) mandatory status of avro columns
ignored
[ https://issues.apache.org/jira/browse/PARQUET-577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Remek Zajac updated PARQUET-577:
--------------------------------
Description:
Avro spec schema [resolution rules ](https://avro.apache.org/docs/1.7.7/spec.html#schema_record) say:
"if the reader's record schema has a field with no default value, and writer's schema does not have a field with the same name, an error is signalled."
I can't find the implementation of this aspect in parquet.avro and indeed observe this rule seemingly ignored. I am using 1.6.0 because that's what we can get off maven.
My writer's schema:
{code}
{
"type" : "record",
"name" : "SampleSchema_v1",
"namespace" : "com.xxxx.spark",
"fields" : [ {
"name" : "stringField",
"type" : "string",
"doc" : "Sample string field"
},{
"name" : "longField",
"type" : "long",
"doc" : "Sample long field"
} ],
"doc:" : "A sample/test schema"
}
{code}
My reader schema:
{code}
{
"type" : "record",
"name" : "SampleSchema_newDefaultlessCol",
"namespace" : "com.xxxx.spark",
"fields" : [ {
"name" : "stringField",
"type" : "string",
"doc" : "Sample string field"
},{
"name" : "longField",
"type" : "long",
"doc" : "Sample long field"
},{
"name" : "mandatoryIntField",
"type" : "int",
"doc" : "Sample mandatory! int field"
}],
"doc:" : "v1 + one extra column that has no default"
}
{code}
This is my test case:
{code}
"accept new column w/o a default [schema-evolution, undesired]" in new MockAvroParquetGrid {
//TODO: the behaviour this test case exercises is UNDESIRED, i.e.: a new column with no default value should
//TODO: Ticket to track this: https://jira.xxxx.io/browse/ADR-610
//constitute an incompatible schema break, instead, this thing uses 0 for the default
val inputSampleRecordsV1 = Seq(new SampleSchema_v1(s"string", 1))
dao.writeParquet[SampleSchema_v1](
SparkBase.sc.parallelize(inputSampleRecordsV1),
SampleSchema_v1.SCHEMA$,
parquetFolder
)
dao
.readParquet[SampleSchema_newDefaultlessCol](parquetFolder, SampleSchema_newDefaultlessCol.SCHEMA$)
.collect().toSeq.head
.getMandatoryIntField must equalTo(0) //TODO: zero is an unwelcome guess
}
{code}
This is the implementation of writeParquet and readParquet
```
def writeParquet[C](source: RDD[C], schema: org.apache.avro.Schema, dstPath: String)
(implicit ctag: ClassTag[C]): Unit = {
val hadoopJob = Job.getInstance()
ParquetOutputFormat.setWriteSupportClass(hadoopJob, classOf[AvroWriteSupport])
ParquetOutputFormat.setCompression(hadoopJob, CompressionCodecName.GZIP)
AvroWriteSupport.setSchema(hadoopJob.getConfiguration, schema)
new PairRDDFunctions[Void,C](
source.map(sourceRecord => (null, sourceRecord))
).saveAsNewAPIHadoopFile(
bucketDAO.uri(dstPath),
classOf[Void], //K
ctag.runtimeClass.asInstanceOf[Class[C]], //V
classOf[AvroParquetOutputFormat],
hadoopJob.getConfiguration
)
}
def readParquet[C](srcPath: String, schema: org.apache.avro.Schema)(implicit ctag: ClassTag[C]): RDD[C] = {
val hadoopJob = Job.getInstance()
ParquetInputFormat.setReadSupportClass(hadoopJob, classOf[AvroReadSupport[C]])
AvroReadSupport.setAvroReadSchema(hadoopJob.getConfiguration, schema)
sc.newAPIHadoopFile(
bucketDAO.uri(srcPath),
classOf[ParquetInputFormat[C]],
classOf[Void], //K
ctag.runtimeClass.asInstanceOf[Class[C]], //V
hadoopJob.getConfiguration
).map { _._2 }
}
```
We use avro-tools to generate java classes from our avro schemas.
java -jar /path/to/avro-tools-1.8.0.jar compile schema <schema file> <destination>
The test case harvests zeroes as values of mandatoryIntField
Naively, I see a problem in the [indexed revordĀ converter](https://github.com/Parquet/parquet-mr/blob/master/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java#L103) in that it cheerfully accepts a condition doomed to fail. The condition being: the reader schema has a column with no default value that is absent in the writer schema.
I am writing predominantly to confirm my diagnosis and to get the intell on why is it implemented the way it is. Is it fixable (or other depend on it as on a feature)? Can people think of a workaround?
was:
Avro spec schema [resolution rules ](https://avro.apache.org/docs/1.7.7/spec.html#schema_record) say:
"if the reader's record schema has a field with no default value, and writer's schema does not have a field with the same name, an error is signalled."
I can't find the implementation of this aspect in parquet.avro and indeed observe this rule seemingly ignored. I am using 1.6.0 because that's what we can get off maven.
My writer's schema:
```
{
"type" : "record",
"name" : "SampleSchema_v1",
"namespace" : "com.xxxx.spark",
"fields" : [ {
"name" : "stringField",
"type" : "string",
"doc" : "Sample string field"
},{
"name" : "longField",
"type" : "long",
"doc" : "Sample long field"
} ],
"doc:" : "A sample/test schema"
}
```
My reader schema:
```
{
"type" : "record",
"name" : "SampleSchema_newDefaultlessCol",
"namespace" : "com.xxxx.spark",
"fields" : [ {
"name" : "stringField",
"type" : "string",
"doc" : "Sample string field"
},{
"name" : "longField",
"type" : "long",
"doc" : "Sample long field"
},{
"name" : "mandatoryIntField",
"type" : "int",
"doc" : "Sample mandatory! int field"
}],
"doc:" : "v1 + one extra column that has no default"
}
```
This is my test case:
```
"accept new column w/o a default [schema-evolution, undesired]" in new MockAvroParquetGrid {
//TODO: the behaviour this test case exercises is UNDESIRED, i.e.: a new column with no default value should
//TODO: Ticket to track this: https://jira.xxxx.io/browse/ADR-610
//constitute an incompatible schema break, instead, this thing uses 0 for the default
val inputSampleRecordsV1 = Seq(new SampleSchema_v1(s"string", 1))
dao.writeParquet[SampleSchema_v1](
SparkBase.sc.parallelize(inputSampleRecordsV1),
SampleSchema_v1.SCHEMA$,
parquetFolder
)
dao
.readParquet[SampleSchema_newDefaultlessCol](parquetFolder, SampleSchema_newDefaultlessCol.SCHEMA$)
.collect().toSeq.head
.getMandatoryIntField must equalTo(0) //TODO: zero is an unwelcome guess
}
```
This is the implementation of writeParquet and readParquet
```
def writeParquet[C](source: RDD[C], schema: org.apache.avro.Schema, dstPath: String)
(implicit ctag: ClassTag[C]): Unit = {
val hadoopJob = Job.getInstance()
ParquetOutputFormat.setWriteSupportClass(hadoopJob, classOf[AvroWriteSupport])
ParquetOutputFormat.setCompression(hadoopJob, CompressionCodecName.GZIP)
AvroWriteSupport.setSchema(hadoopJob.getConfiguration, schema)
new PairRDDFunctions[Void,C](
source.map(sourceRecord => (null, sourceRecord))
).saveAsNewAPIHadoopFile(
bucketDAO.uri(dstPath),
classOf[Void], //K
ctag.runtimeClass.asInstanceOf[Class[C]], //V
classOf[AvroParquetOutputFormat],
hadoopJob.getConfiguration
)
}
def readParquet[C](srcPath: String, schema: org.apache.avro.Schema)(implicit ctag: ClassTag[C]): RDD[C] = {
val hadoopJob = Job.getInstance()
ParquetInputFormat.setReadSupportClass(hadoopJob, classOf[AvroReadSupport[C]])
AvroReadSupport.setAvroReadSchema(hadoopJob.getConfiguration, schema)
sc.newAPIHadoopFile(
bucketDAO.uri(srcPath),
classOf[ParquetInputFormat[C]],
classOf[Void], //K
ctag.runtimeClass.asInstanceOf[Class[C]], //V
hadoopJob.getConfiguration
).map { _._2 }
}
```
We use avro-tools to generate java classes from our avro schemas.
java -jar /path/to/avro-tools-1.8.0.jar compile schema <schema file> <destination>
The test case harvests zeroes as values of mandatoryIntField
Naively, I see a problem in the [indexed revordĀ converter](https://github.com/Parquet/parquet-mr/blob/master/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java#L103) in that it cheerfully accepts a condition doomed to fail. The condition being: the reader schema has a column with no default value that is absent in the writer schema.
I am writing predominantly to confirm my diagnosis and to get the intell on why is it implemented the way it is. Is it fixable (or other depend on it as on a feature)? Can people think of a workaround?
> mandatory status of avro columns ignored
> ----------------------------------------
>
> Key: PARQUET-577
> URL: https://issues.apache.org/jira/browse/PARQUET-577
> Project: Parquet
> Issue Type: Bug
> Components: parquet-avro
> Affects Versions: 1.6.0
> Reporter: Remek Zajac
>
> Avro spec schema [resolution rules ](https://avro.apache.org/docs/1.7.7/spec.html#schema_record) say:
> "if the reader's record schema has a field with no default value, and writer's schema does not have a field with the same name, an error is signalled."
> I can't find the implementation of this aspect in parquet.avro and indeed observe this rule seemingly ignored. I am using 1.6.0 because that's what we can get off maven.
> My writer's schema:
> {code}
> {
> "type" : "record",
> "name" : "SampleSchema_v1",
> "namespace" : "com.xxxx.spark",
> "fields" : [ {
> "name" : "stringField",
> "type" : "string",
> "doc" : "Sample string field"
> },{
> "name" : "longField",
> "type" : "long",
> "doc" : "Sample long field"
> } ],
> "doc:" : "A sample/test schema"
> }
> {code}
> My reader schema:
> {code}
> {
> "type" : "record",
> "name" : "SampleSchema_newDefaultlessCol",
> "namespace" : "com.xxxx.spark",
> "fields" : [ {
> "name" : "stringField",
> "type" : "string",
> "doc" : "Sample string field"
> },{
> "name" : "longField",
> "type" : "long",
> "doc" : "Sample long field"
> },{
> "name" : "mandatoryIntField",
> "type" : "int",
> "doc" : "Sample mandatory! int field"
> }],
> "doc:" : "v1 + one extra column that has no default"
> }
> {code}
> This is my test case:
> {code}
> "accept new column w/o a default [schema-evolution, undesired]" in new MockAvroParquetGrid {
> //TODO: the behaviour this test case exercises is UNDESIRED, i.e.: a new column with no default value should
> //TODO: Ticket to track this: https://jira.xxxx.io/browse/ADR-610
> //constitute an incompatible schema break, instead, this thing uses 0 for the default
> val inputSampleRecordsV1 = Seq(new SampleSchema_v1(s"string", 1))
> dao.writeParquet[SampleSchema_v1](
> SparkBase.sc.parallelize(inputSampleRecordsV1),
> SampleSchema_v1.SCHEMA$,
> parquetFolder
> )
> dao
> .readParquet[SampleSchema_newDefaultlessCol](parquetFolder, SampleSchema_newDefaultlessCol.SCHEMA$)
> .collect().toSeq.head
> .getMandatoryIntField must equalTo(0) //TODO: zero is an unwelcome guess
> }
> {code}
> This is the implementation of writeParquet and readParquet
> ```
> def writeParquet[C](source: RDD[C], schema: org.apache.avro.Schema, dstPath: String)
> (implicit ctag: ClassTag[C]): Unit = {
> val hadoopJob = Job.getInstance()
> ParquetOutputFormat.setWriteSupportClass(hadoopJob, classOf[AvroWriteSupport])
> ParquetOutputFormat.setCompression(hadoopJob, CompressionCodecName.GZIP)
> AvroWriteSupport.setSchema(hadoopJob.getConfiguration, schema)
> new PairRDDFunctions[Void,C](
> source.map(sourceRecord => (null, sourceRecord))
> ).saveAsNewAPIHadoopFile(
> bucketDAO.uri(dstPath),
> classOf[Void], //K
> ctag.runtimeClass.asInstanceOf[Class[C]], //V
> classOf[AvroParquetOutputFormat],
> hadoopJob.getConfiguration
> )
> }
> def readParquet[C](srcPath: String, schema: org.apache.avro.Schema)(implicit ctag: ClassTag[C]): RDD[C] = {
> val hadoopJob = Job.getInstance()
> ParquetInputFormat.setReadSupportClass(hadoopJob, classOf[AvroReadSupport[C]])
> AvroReadSupport.setAvroReadSchema(hadoopJob.getConfiguration, schema)
> sc.newAPIHadoopFile(
> bucketDAO.uri(srcPath),
> classOf[ParquetInputFormat[C]],
> classOf[Void], //K
> ctag.runtimeClass.asInstanceOf[Class[C]], //V
> hadoopJob.getConfiguration
> ).map { _._2 }
> }
> ```
> We use avro-tools to generate java classes from our avro schemas.
> java -jar /path/to/avro-tools-1.8.0.jar compile schema <schema file> <destination>
> The test case harvests zeroes as values of mandatoryIntField
> Naively, I see a problem in the [indexed revordĀ converter](https://github.com/Parquet/parquet-mr/blob/master/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java#L103) in that it cheerfully accepts a condition doomed to fail. The condition being: the reader schema has a column with no default value that is absent in the writer schema.
> I am writing predominantly to confirm my diagnosis and to get the intell on why is it implemented the way it is. Is it fixable (or other depend on it as on a feature)? Can people think of a workaround?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)