You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/26 12:49:40 UTC

[GitHub] [arrow-rs] alamb commented on issue #200: [Rust][Parquet] Use iterators to increase performance of creating Arrow arrays

alamb commented on issue #200:
URL: https://github.com/apache/arrow-rs/issues/200#issuecomment-826808516


   Comment from Jorge Leitão(jorgecarleitao) @ 2021-03-06T19:50:27.340+0000:
   <pre>Not much to say other than AWESOME! Look very nuch forward to it!</pre>
   
   Comment from Neville Dipale(nevi_me) @ 2021-03-07T09:34:38.277+0000:
   <pre>This sounds like a solid proposal, I also like the split that you suggest :)
   
   </pre>
   
   Comment from Daniël Heres(Dandandan) @ 2021-03-07T09:56:26.934+0000:
   <pre>That sounds like a cool idea. I like the idea of a very thin abstraction that doesn't sacrifice performance.
   
   For the iterator type, I think the count might not be (always) necessary? As it can depend on the datatype, or will be always be the same (1 or 32 / etc) for the other types? Are there situations were we really need the count?</pre>
   
   Comment from Yordan Pavlov(yordan-pavlov) @ 2021-03-07T11:00:25.271+0000:
   <pre>[~Dandandan] regarding the count values, yes you are right - in the case of string arrays, the generated count values will always equal 1. But the count values may still be useful in cases where a primitive array is split across multiple non-contiguous slices, e.g. due to page boundaries. It may be possible to calculate the count values based on the data type (I have to think more about that), but at the moment I still like how they make the expected value count explicit. This could change during implementation though.</pre>
   
   Comment from Daniël Heres(Dandandan) @ 2021-03-07T21:38:03.584+0000:
   <pre>[~yordan-pavlov] makes sense, thanks!</pre>
   
   Comment from Daniël Heres(Dandandan) @ 2021-03-27T11:54:01.798+0000:
   <pre>[~yordan-pavlov] just checking - any updates to share and could you use some help?
   Any idea yet how the work could be split into multiple issues / PRs?
   Maybe I could focus on a subtask if we can split the work.
   
   I think it would be amazing to have a faster Parquet reader, even if it "only" is 5-10% - as it's a large performance bottleneck now :).
   
   Do you have some WIP code & experiments that could use a review?</pre>
   
   Comment from Yordan Pavlov(yordan-pavlov) @ 2021-03-28T23:04:23.361+0000:
   <pre>Hi [~Dandandan], apologies I should have updated on my progress earlier, but I was busy trying things out.
   
   My thinking so far has been in the lines of how to replace pretty much the entire path from parquet pages all the way into arrow arrays using iterators (because I am hoping that an iterator-based implementation would minimize unnecessary memory allocation). Something like this: 
   Iterator<RowGroup> >> Iterator<(ColumnChunkContext, Page)> >> Iterator<(ValueSliceIterator, DefLevelIterator, RepLevelIterator)>
   >> (Iterator<ValueSliceIterator>, Iterator<DefLevelIterator>, Iterator<RepLevelIterator>)
   So far I have implemented splitting an iterator into multiple (parallel) iterators based on [https://stackoverflow.com/questions/25586681/splitting-iteratora-b-into-iteratora-and-iteratorb#25588440]
   
   This will be useful, as illustrated above, for splitting an iterator over pages into iterators over values, def levels and rep levels which can be consume independently (but usually in parallel).
   
   Also, in the past week I have been working on an splitting an iterator of byte slices into iterators that return no more than batch_size items - I have almost figured out how to do this, I just have to make it a bit more generic and do some more benchmarking. I would also like to do some benchmarking with [https://docs.rs/hyper/0.14.4/hyper/body/struct.Bytes.html] (which appears to be an alternative implementation of the ByteBufferPtr that already exists in the parquet crate).
   
   Figuring out exactly how the work will be split into different PRs is what I will focus on next, but I already have some ideas:
   
   I think would be to start small, by building on PageIterator::next() -> PageReader to produce an iterator of pages, something like:
   
    
   // create iterator of (contiguous) data slices across all pages from all row groups
   row_group_iter // iter of PageReader
     // add row group context using the scan() operator
     .iter_mut().flat_map(|x| {
         // the column chunk / row group context is used to store dictionaries for dictionary-encoded chunks
         let context = Rc::new(RefCell::new(IterContext::new()));
         x.map(move |v| (context.clone(), v))
     }) // iter of (mut RowGroupContext, Page)
     .map(|(c, p)| { 
       let mut context = c.borrow_mut();
       get_decoder(p)
     }) // iter of AsRef<[u8]>
     .flatten()
    
   
   Iterating over pages is something that is implemented inconsistently for primitive and complex types, and I would like to ultimately merge the two implementations, so that there is no more primitive or complex array reader, just a single arrow array reader using adapters / converters for different types of arrays.
   
   Also the decoding functionality implemented in each parquet type is only used by the plain decoder (and not used by any other decoder) and I would look to move this away from the types and into the plain decoder where it belongs.
   
   Then, I would look into implementing the Iterator<Item = AsRef<[u8]>> idea for the different decoders and also into how exactly the adaptors / converters for different types of arrays would work.
   
   I am open to suggestions on how we could collaborate better on this. Let me know what you think.</pre>
   
   Comment from Jorge Leitão(jorgecarleitao) @ 2021-03-29T05:40:15.197+0000:
   <pre>FWIW, I started going through the parquet crate and re-write some parts of it. There are many, many opportunities to improve performance there.
   
   I also agree with you that we should push the "to arrow" to the page level. Also, IMO we should scratch the "DataType" and instead implement a specific implementation for boolean, (i32, i64, float, double), byteArray, FixedByteArray.
   
   I am looking into the encodings, and IMO there is some work groundwork that we need to take before going for the arrow-specific problem.
   
   I am looking at the RLE encoding, and I think that it may not be correct atm. Parquet [expects a 4-byte length|https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3], but we only take a 1-byte length (i.e. up to 255 values). I do not know how we can even read def, ref levels, and boolean values with our encoder atm.
   
   I also found [this crate|https://github.com/tantivy-search/bitpacking/issues] that seems to be implementing the encoding we need, including ordered, with SIMD instructions. We could probably think about depending on it.
   
   What I did so far: created a new repo from scratch and started moving bits by bits things there, going through a full review of the code (my personal way of reading and understanding code).
   
   I think that the easiest way would be to have a call where we would align knowledge and priorities.</pre>
   
   Comment from Yordan Pavlov(yordan-pavlov) @ 2021-03-29T09:24:04.726+0000:
   <pre>[~jorgecarleitao] thank you for looking into the parquet encoding code; I was also looking into the RLE code, because I needed to understand how it would fit with an Iterator<AsRef<[u8]>> abstraction. I do agree that the RLE code needs improvement / simplification and it could also be made faster (e.g. using SIMD) and if a library can be used to do all that - great. I also agree that there are many improvement opportunities throughout the parquet crate and it will continue to be an area of focus for me for a while, but sadly I only have a couple of hours per day to spare. 
   
   When you said "to have a call" what did you have in mind in terms of frequency (e.g. weekly, bi-weekly, etc.) and channel (zoom, telegram, etc.) ?</pre>
   
   Comment from Jorge Leitão(jorgecarleitao) @ 2021-03-30T06:14:34.779+0000:
   <pre>Ok, so, just to give a heads up: I have been experimenting with the code, and here is the result so far: [https://github.com/jorgecarleitao/parquet2]
   
   I was able to read a parquet file with arbitrary parallelism (the IO-CPU tradeoff is delegated to downstream). The missing parts are decoding and deserialization, which IMO is what [~yordan-pavlov] is thinking about.
   
   I reduced the problem to: given an iterator of decompressed (but encoded) pages, convert it to an arrow Array. IMO when no encoding is used, we either use a back-to-back or similar (e.g. Int96 is special). When encoding is used, we should probably decode directly to buffers, so that we avoid an extra memcopy.
   
   [~yordan-pavlov], do you use slack? There is an arrow-rust channel on the official Apache slack: [https://the-asf.slack.com/archives/C01QUFS30TD] We could sync there.
   
    </pre>
   
   Comment from Yordan Pavlov(yordan-pavlov) @ 2021-03-30T08:54:56.298+0000:
   <pre>[~jorgecarleitao] I would be happy to have a chat in Slack, but it appears that an @apache.org email address is necessary to join and I don't have one.
   
   Also, I noticed that in your parquet2 repo, a separate page iterator is created for each row group, very similar to how it works currently. I was planning to wrap multiple row group page iterators into a single iterator returning a sequence of pages from multiple row groups (see the code snippet in my previous comment).</pre>
   
   Comment from Daniël Heres(Dandandan) @ 2021-03-30T10:12:09.111+0000:
   <pre>[~yordan-pavlov] you can join the apache slack here: https://s.apache.org/slack-invite</pre>
   
   Comment from Jorge Leitão(jorgecarleitao) @ 2021-03-30T10:48:14.441+0000:
   <pre>I see. To understand: is there a reason why this should be in [Parquet] instead of in [DataFusion]? I.e. why should we push a specific parallelism strategy to the library?
   
   Asking this because the way I see it, the parquet crate can't tell which use-case is being used on and provide an optimal strategy for (one record per page, per group or per file or per files?). For example, s3 vs hdfs vs local file-system typically require different parallelism strategies.
   
   My hypothesis (which may be wrong!) is that the parquet crate should offer "units of work" that can be divided/parallelized according to IO (e.g. s3 vs filesystem), memory and CPU constraints that each consumer has, and allow consumers of the library (e.g. DataFusion, Polars, Ballista, s3 vs hdfs vs file-system) to design strategies that fit their constraints the best, by assembling these units according to their compute model.</pre>
   
   Comment from Yordan Pavlov(yordan-pavlov) @ 2021-04-05T18:51:49.565+0000:
   <pre>UPDATE: after spending the past few weeks figuring out how different steps could be implemented on the way from page buffer to arrow array (such as create iterator of pages across row groups, share dictionary data between pages in the same row column chunk, split page buffer into different iterators for data, rep and def levels, and reading batches of values), my next step is going to be implementing this idea end-to-end for a particular type of array (StringArray). In this way the idea can be tested sooner (in terms of performance, etc.), reviewed and feedback collected, before expanding the implementation for more types. I hope to have an initial implementation in about a week.</pre>
   
   Comment from Yordan Pavlov(yordan-pavlov) @ 2021-04-13T21:51:19.494+0000:
   <pre>I have finally been able to assemble enough code to demonstrate the core idea and have created a branch here [https://github.com/yordan-pavlov/arrow/commit/c62c5394726b79d428a93e2593d0c24da3c9d286#diff-dce1a37fc60ea0c8d13a61bf530abbf9f82aef43224597f31a7ba4d9fe7bd10dR258]
   
   The test doesn't pass yet, but the code compiles and demonstrates how an iterator could be created over many pages from many row groups / column chunks, and then split into separate iterators for (values, def levels, rep levels) and then read in batches.
   
   The iterator is created in ArrowArrayReader::try_new and used in <ArrowArrayReader as ArrayReader>::next_batch.
   
   My plan is that ArrowArrayReader will replace both PrimitiveArrayReader and ComplexObjectArrayReader when arrow array converters have been implemented for all types.
   
   Feedback is most welcome.
   
   Next steps are:
    * complete implementation to define arrow array converter interface
    * implement decoder iterator for def / rep levels
    * implement decoder iterator for plain encoding
    * implement StringArray converter
    * make unit test pass
    * attempt to replace ComplexObjectArrayReader for StringArrays
    * benchmark performance
    * create initial PR
   
   After this initial PR, implementing arrow array converters for the remaining types could be done in separate PRs.</pre>
   
   Comment from Yordan Pavlov(yordan-pavlov) @ 2021-04-18T20:13:44.587+0000:
   <pre>UPDATE: over the past few days I managed to finish the core implementation of the new ArrowArrayReader with the key bits being:
    * the converters will only produce an all-value / no-null ArrayData instance - this simplifies the converter interface and keeps all other logic generic
    * if no def levels are available, this no-null ArrayData produced from the converter is simply converted to an array and returned without changes
    * if def levels are available, a BooleanArray is created from the def levels and used to efficiently determine how many values to read and also efficiently insert NULLs using MutableArrayData (with an algorithm very similar to zip()) - this implementation re-uses as much of the existing arrow code as possible
    * the StringArray converter has been implemented as a function before moving to a converter in a later change
   
   Next steps are:
    * implement decoder iterator for def / rep levels
    * implement decoder iterator for plain encoding
    * make unit test pass
    * attempt to replace ComplexObjectArrayReader for StringArrays
    * benchmark performance
    * create initial PR
   
   the latest changes can be found here:
   
   https://github.com/yordan-pavlov/arrow/commit/7299f2a747cc52237c21b9d85df994a66097d731#diff-dce1a37fc60ea0c8d13a61bf530abbf9f82aef43224597f31a7ba4d9fe7bd10dR418</pre>
   
   Comment from Yordan Pavlov(yordan-pavlov) @ 2021-04-21T21:49:50.476+0000:
   <pre>UPDATE: I have now implemented the level decoder iterator and support for def and rep levels in the ArrowArrayReader here:
   
   [https://github.com/yordan-pavlov/arrow/commit/3a820c58747cf692efaf90b7bc3716d60b6ecb85]
   
   This commit incudes a change to load def / rep levels into Int16Array which is used to efficiently calculate the null bitmap for values from def levels using arrow::compute::eq_scalar.</pre>
   
   Comment from Yordan Pavlov(yordan-pavlov) @ 2021-04-25T20:58:50.034+0000:
   <pre>UPDATE: I have added the ArrayConverter trait, implemented decoder iterators for plain encoding, and the string array test now passes;
   
   the latest changes can be found here: [https://github.com/yordan-pavlov/arrow/commit/dc93466510c6be1c6a21a61b1e948a3fa7959a9a]
   
   Next steps are:
    * attempt to replace ComplexObjectArrayReader for StringArrays
    * implement missing parts to make ArrowArrayReader work for StringArrays (likely RLE and dictionary encodings)
    * benchmark performance
    * create initial PR</pre>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org