You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Tao Li (Jira)" <ji...@apache.org> on 2021/02/24 03:10:00 UTC

[jira] [Commented] (BEAM-11527) Support user configurable Hadoop Configuration flags for ParquetIO

    [ https://issues.apache.org/jira/browse/BEAM-11527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17289608#comment-17289608 ] 

Tao Li commented on BEAM-11527:
-------------------------------

Hi I am using this new feature to solve a problem when using ParquetIO to read spark created parquet. The problem is that spark created parquet automatically adds “list” and “element” in the schema for array fields, which lead to “list” and “element” fields in the beam schema of this PCollection from ParquetIO reader.

{"type":"record","name":"spark_schema","fields":[{"name":"first_field","type":"int"},{"name":"numbers_1","type":{"type":"array","items":{"type":"record","name":"{color:#DE350B}list{color}","fields":[{"name":"{color:#DE350B}element{color}","type":"int"}]}}},{"name":"numbers_2","type":{"type":"array","items":{"type":"record","name":"{color:#DE350B}list{color}","namespace":"list2","fields":[{"name":"{color:#DE350B}element{color}","type":"int"}]}}}]}

Here is a row:
{"first_field": 1, "numbers_1": [{"{color:#DE350B}element{color}": 1}, {"{color:#DE350B}element{color}": 2}], "numbers_2": [{"{color:#DE350B}element{color}": 1}, {"{color:#DE350B}element{color}": 2}]}

It’s really annoying and sometimes really hard to handle these “list” and “element” fields in my beam app. So I have been trying to figure out a way to avoid these “list” and “element” fields when reading these spark created parquet files. I found a way with AvroParquetReader. Setting “parquet.avro.add-list-element-records" to false solves my problem. Please see below code. The output avro schema does not have “list” and “element” fields. 
 
        Configuration conf = new Configuration();
        conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
        InputFile inputFile = HadoopInputFile.fromPath(new Path(filePath, conf);
        ParquetReader<GenericRecord> reader =  AvroParquetReader
                .<GenericRecord>builder(inputFile)
                .withConf(conf)
                .build();
        GenericRecord nextRecord = reader.read();
        System.out.println(nextRecord.getSchema());
 
After reading parquet IO source code, looks like ADD_LIST_ELEMENT_RECORDS setting should take effect in AvroParquetReader first, then we use the input Avro schema for encoding purpose. If this understanding is correct, below code should just work fine. In below code, “ADD_LIST_ELEMENT_RECORDS” is set to false to avoid “list” and “element” fields (hopefully). “inputAvroSchema” is set to a normal avro schema that does NOT contain “list” and “element” fields. The input is spark created parquet files that contain a array field.
 
        avroConfig.put(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false");
        PCollection<Row> inputData = pipeline().apply(ParquetIO.read(inputAvroSchema).from(path).withConfiguration(avroConfig))
 
However I see this error:
java.lang.ClassCastException: shaded.org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number
                at shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
                at shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
                at shaded.org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
                at shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
                at shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
 
This error indicates that the data being written with GenericDatumWriter still contains the “list” field, which is a record type. Seems like the setting “ADD_LIST_ELEMENT_RECORDS” is not taking effect. Am I using the avro config correctly?

> Support user configurable Hadoop Configuration flags for ParquetIO
> ------------------------------------------------------------------
>
>                 Key: BEAM-11527
>                 URL: https://issues.apache.org/jira/browse/BEAM-11527
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-parquet
>            Reporter: Anant Damle
>            Assignee: Anant Damle
>            Priority: P2
>              Labels: parquet
>             Fix For: 2.28.0
>
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Provide an user configurable interface to provide/input Hadoop configuration for ParquetIO.
> Current behaviour supports only \{code}AvroReadSupport.AVRO_COMPATIBILITY\{code} flag.
>  
> There are now more options supported for reader as introduced through [PARQUET-1928|https://issues.apache.org/jira/browse/PARQUET-1928] and [PR/831|https://github.com/apache/parquet-mr/pull/831]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)