You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Tao Li (Jira)" <ji...@apache.org> on 2021/02/25 03:22:00 UTC

[jira] [Commented] (BEAM-4587) Test interoperability between Spark, Flink and Beam in terms of reading/writing Parquet files

    [ https://issues.apache.org/jira/browse/BEAM-4587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290644#comment-17290644 ] 

Tao Li commented on BEAM-4587:
------------------------------

[~ŁukaszG] I got an issue using Beam ParquetIO to read spark created parquet that contains array type data. Also mentioned this issue in BEAM-11721 and BEAM-11527. 

The problem is when spark creates parquet, the writer automatically adds “list” and “element” fields for array fields. It’s annoying and sometimes really hard to handle these “list” and “element” fields in a beam app. This is not a Beam issue and we should blame Spark to create array type data in a non-standard fashion.

To illustrate this issue, I have attached a spark created parquet in this ticket. If we use parquet-tools to inspect this file, here is the meta info:

creator: parquet-mr version 1.10.1 (build 815bcfa4a4aacf66d207b3dc692150d16b5740b9)
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"numbers","type":

{"type":"array","elementType":"integer","containsNull":true}
,"nullable":true,"metadata":{}}]}

file schema: spark_schema
--------------------------------------------------------------------------------
numbers: OPTIONAL F:1
.{color:#DE350B}list{color}: REPEATED F:1
..{color:#DE350B}element{color}: OPTIONAL INT32 R:1 D:3

This parquet schema leads to “list” and “element” fields when using beam Parquet IO to read it, as below:

{"type":"record","name":"spark_schema","fields":[{"name":"first_field","type":"int"},{"name":"numbers_1","type":{"type":"array","items":{"type":"record","name":"{color:#DE350B}list{color}","fields":[{"name":"{color:#DE350B}element{color}","type":"int"}]}}},{"name":"numbers_2","type":{"type":"array","items":{"type":"record","name":"{color:#DE350B}list{color}","namespace":"list2","fields":[{"name":"{color:#DE350B}element{color}","type":"int"}]}}}]}

Here is what a row looks like. You can see "element" showing there, which is annoying and undesirable.
{"first_field": 1, "numbers_1": [{"element": 1}, {"element": 2}], "numbers_2": [{"element": 1}, {"element": 2}]}

To solve this problem, I have been trying to figure out a way to avoid these “list” and “element” fields when reading these spark created parquet files. I found a way with AvroParquetReader. Setting “parquet.avro.add-list-element-records" to false solves my problem. Please see below code. The output avro schema does not have “list” and “element” fields.
{noformat}
        Configuration conf = new Configuration();
        conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
        InputFile inputFile = HadoopInputFile.fromPath(new Path(filePath, conf);
        ParquetReader<GenericRecord> reader =  AvroParquetReader
                .<GenericRecord>builder(inputFile)
                .withConf(conf)
                .build();
        GenericRecord nextRecord = reader.read();
        System.out.println(nextRecord.getSchema());
  {noformat}

Then I tried to specify this setting through withConfiguration() (a new feature from Beam 2.28). In the code, “ADD_LIST_ELEMENT_RECORDS” is set to false to avoid “list” and “element” fields (hopefully). The input is spark created parquet files that contain an array field.
{noformat}
avroConfig.put(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false");
PCollection<GenericRecord> inputData = pipeline.apply(ParquetIO.read(inputAvroSchema).from(path).withConfiguration(avroConfig))
{noformat}

However for some reason the setting does not seem to be taking effect. I still see "list" and "element" fields in the schema of PCollection. Maybe I am not using withConfiguration() correctly. Do you have any ideas to solve this problem? Thanks a lot!


> Test interoperability between Spark, Flink and Beam in terms of reading/writing Parquet files
> ---------------------------------------------------------------------------------------------
>
>                 Key: BEAM-4587
>                 URL: https://issues.apache.org/jira/browse/BEAM-4587
>             Project: Beam
>          Issue Type: Task
>          Components: io-java-parquet
>            Reporter: Lukasz Gajowy
>            Assignee: Lukasz Gajowy
>            Priority: P3
>             Fix For: 2.5.0
>
>         Attachments: from-spark.snappy.parquet
>
>
> Since ParquetIO is merged to master, we should test how it behaves with parquet files created by native Spark and Flink applications. 
> More specifically, we should:
>  - test if files created by Flink/Spark can be read successfully using ParquetIO in Beam
>  - test if files created by beam using ParquetIO in Beam can be read using Flink/Spark native application.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)