You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by Ryan Blue <rb...@netflix.com.INVALID> on 2018/06/18 19:49:35 UTC

Re: Very slow complex type column reads from parquet

Jakub,

I'm moving the Spark list to bcc and adding the Parquet list, since you're
probably more interested in Parquet tuning.

It makes sense that you're getting better performance when you have more
matching rows distributed, especially if those rows have a huge column that
you need to project. You're just able to use more processing power at once.
Finding a good layout or sort to distribute the data is a good start, and
you're right that decreasing the row group size would help: more (matching)
row groups, more parallelism. Just be careful because decreasing the row
group size can inflate the overall size of the data.

Parquet will also eliminate row groups in each task when possible. At the
start of the task, it uses the row group's column stats (min, max, num
nulls), and before the row group is read it will also attempt to read any
column dictionaries and use those to see if any values match.

If a row group can't be eliminated, all of the rows in that group will be
materialized. One thing that's probably really hurting you here is that
each row from Parquet is copied in memory twice for this read path. I
pointed this out recently and that's why I'm suggesting we clean up the
expectations of Spark SQL operators (see this comment
<https://github.com/apache/spark/pull/21118#issuecomment-386647848> and the dev
list thread
<https://lists.apache.org/thread.html/ee1f77d8f6f35a1d873735e7af88fb5d76957c2488ef84c722f773e7@%3Cdev.spark.apache.org%3E>
).

There are patches out in the Parquet community to do page-level filtering
as well, which will help your use case quite a bit.

rb

On Fri, Jun 15, 2018 at 1:44 AM, Jakub Wozniak <ja...@cern.ch>
wrote:

> Hello,
>
> I’m sorry to bother you again but it is quite important for us to
> understand the problem better.
>
> One more finding in our problem is that the performance of queries in a
> timestamp sorted file depend a lot on the predicate timestamp.
> If you are lucky to get some records from the start of the row group it
> might be fast (like seconds). If you search for something that is at the
> end of the row group the query takes minutes.
> I guess this is due to the fact that it has to scan all the previous
> records in the row group until it finds the right ones at the end of it...
>
> Now I have a couple of questions regarding the way the Parquet file is
> read.
>
> 1) Does it always decode the query column (projection from select) even if
> the predicate column does not match (to me it looks like it but I might be
> wrong)?
> 2) Sorting the file will result in “indexed’ row groups so it will be
> easier to locate the which row group to scan but isn’t it at the same time
> limiting parallelism? If the data is randomly placed in the row groups it
> will be searched with as many tasks as we have row groups, right (or at
> least more than 1)? Is there any common rule we can formulate or it is very
> data and/or query dependent?
> 3) Would making a row group smaller (like by half) help? Currently I can
> see that the row groups are about the size of the hdfs block (256MB) but
> sometimes smaller or even bigger.
> We have no settings for the row group so I guess the default hdfs block
> size is used, right?
> Do you have any recommendation / experience with that?
>
> Thanks a lot for your help,
> Jakub
>
>
>
> On 14 Jun 2018, at 12:07, Jakub Wozniak <ja...@cern.ch> wrote:
>
> Dear Ryan,
>
> Thanks a lot for your answer.
> After having sent the e-mail we have investigated a bit more the data
> itself.
> It happened that for certain days it was very skewed and one of the row
> groups had much more records that all others.
> This was somehow related to the fact that we have sorted it using our
> object ids and by chance those that went first were smaller (or compressed
> better).
> So the Parquet file had a 6 rows groups where the first one had 300k rows
> and others only 30k rows.
> The search for a given object fell into the first row group and lasted
> very long time.
> The data itself was very much compressed as it contained a lot of zeros.
> To give some numbers the 600MB parquet file expanded to 56GB in JSON.
>
> What we did is to sort the data not by object id but by the record
> timestamp which resulted in much more even data distribution among the row
> groups.
> This in fact helped a lot for the query time (using the timestamp & object
> id)
>
> I have to say that I haven't fully understood this phenomenon yet as I’m
> not a Parquet format & reader expert (at least not yet).
> Maybe it is just a simple function of how many records Spark has to scan
> and the level of parallelism (searching for a given object id when sorted
> by time needs to scan all/more the groups for larger times).
> One question here - is Parquet reader reading & decoding the projection
> columns even if the predicate columns should filter the record out?
>
> Unfortunately we have to have those big columns in the query as people
> want to do analysis on them.
>
> We will continue to investigate…
>
> Cheers,
> Jakub
>
>
>
> On 12 Jun 2018, at 22:51, Ryan Blue <rb...@netflix.com> wrote:
>
> Jakub,
>
> You're right that Spark currently doesn't use the vectorized read path for
> nested data, but I'm not sure that's the problem here. With 50k elements in
> the f1 array, it could easily be that you're getting the significant
> speed-up from not reading or materializing that column. The non-vectorized
> path is slower, but it is more likely that the problem is the data if it is
> that much slower.
>
> I'd be happy to see vectorization for nested Parquet data move forward,
> but I think you might want to get an idea of how much it will help before
> you move forward with it. Can you use Impala to test whether vectorization
> would help here?
>
> rb
>
>
>
> On Mon, Jun 11, 2018 at 6:16 AM, Jakub Wozniak <ja...@cern.ch>
> wrote:
>
>> Hello,
>>
>> We have stumbled upon a quite degraded performance when reading a complex
>> (struct, array) type columns stored in Parquet.
>> A Parquet file is of around 600MB (snappy) with ~400k rows with a field
>> of a complex type { f1: array of ints, f2: array of ints } where f1 array
>> length is 50k elements.
>> There are also other fields like entity_id: long, timestamp: long.
>>
>> A simple query that selects rows using predicates entity_id = X and
>> timestamp >= T1 and timestamp <= T2 plus ds.show() takes 17 minutes to
>> execute.
>> If we remove the complex type columns from the query it is executed in a
>> sub-second time.
>>
>> Now when looking at the implementation of the Parquet datasource the
>> Vectorized* classes are used only if the read types are primitives. In
>> other case the code falls back to the parquet-mr default implementation.
>> In the VectorizedParquetRecordReader there is a TODO to handle complex
>> types that "should be efficient & easy with codegen".
>>
>> For our CERN Spark usage the current execution times are pretty much
>> prohibitive as there is a lot of data stored as arrays / complex types…
>> The file of 600 MB represents 1 day of measurements and our data
>> scientists would like to process sometimes months or even years of those.
>>
>> Could you please let me know if there is anybody currently working on it
>> or maybe you have it in a roadmap for the future?
>> Or maybe you could give me some suggestions how to avoid / resolve this
>> problem? I’m using Spark 2.2.1.
>>
>> Best regards,
>> Jakub Wozniak
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>
>
>
>


-- 
Ryan Blue
Software Engineer
Netflix