You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Gautam <ga...@gmail.com> on 2019/07/20 00:22:46 UTC

Re: Approaching Vectorized Reading in Iceberg ..

Hey Guys,
           Sorry bout the delay on this. Just got back on getting a basic
working implementation in Iceberg for Vectorization on primitive types.

*Here's what I have so far :  *

I have added `ParquetValueReader` implementations for some basic primitive
types that build the respective Arrow Vector (`ValueVector`) viz.
`IntVector` for int, `VarCharVector` for strings and so on. Underneath each
value vector reader there are column iterators that read from the parquet
pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
`ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
stitched together using a `ColumnarBatchReader` (which as the name suggests
wraps ColumnarBatches in the iterator)   I'v verified that these pieces
work properly with the underlying interfaces.  I'v also made changes to
Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
`SupportsScanColumnarBatch` mixin to the reader).  So the reader now
expects ColumnarBatch instances (instead of InternalRow). The query
planning runtime works fine with these changes.

Although it fails during query execution, the bit it's  currently failing
at is this line of code :
https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414

This code, I think,  tries to apply the iterator's schema projection on the
InternalRow instances. This seems to be tightly coupled to InternalRow as
Spark's catalyst expressions have implemented the UnsafeProjection for
InternalRow only. If I take this out and just return the
`Iterator<ColumnarBatch>` iterator I built it returns empty result on the
client. I'm guessing this is coz Spark is unaware of the iterator's schema?
There's a Todo in the code that says "*remove the projection by reporting
the iterator's schema back to Spark*".  Is there a simple way to
communicate that to Spark for my new iterator? Any pointers on how to get
around this?


Thanks and Regards,
-Gautam.




On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:

> Replies inline.
>
> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com> wrote:
>
>> Thanks for responding Ryan,
>>
>> Couple of follow up questions on ParquetValueReader for Arrow..
>>
>> I'd like to start with testing Arrow out with readers for primitive type
>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>> currently doesn't have converters for map type. How can I default these
>> types to regular materialization whilst supporting Arrow based support for
>> primitives?
>>
>
> We should look at what Spark does to handle maps.
>
> I think we should get the prototype working with test cases that don't
> have maps, structs, or lists. Just getting primitives working is a good
> start and just won't hit these problems.
>
>
>> Lemme know if this makes sense...
>>
>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>> ArrowColumnVectors of corresponding column types by iterating over
>> underlying ColumnIterator *n times*, where n is size of batch.
>>
>
> Sounds good to me. I'm not sure about extending vs wrapping because I'm
> not too familiar with the Arrow APIs.
>
>
>> - Reader.newParquetIterable()  maps primitive column types to the newly
>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>> current *InternalRow* based ValueReaders
>>
>
> Sounds good for primitives, but I would just leave the nested types
> un-implemented for now.
>
>
>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although* *I'm
>> a bit lost on how the stitching of columns happens currently*? .. and
>> how the ArrowColumnVectors could  be stitched alongside regular columns
>> that don't have arrow based support ?
>>
>
> I don't think that you can mix regular columns and Arrow columns. It has
> to be all one or the other. That's why it's easier to start with
> primitives, then add structs, then lists, and finally maps.
>
>
>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so that
>> DataSourceV2ScanExec starts using ColumnarBatch scans
>>
>
> We will probably need two paths. One for columnar batches and one for
> row-based reads. That doesn't need to be done right away and what you
> already have in your working copy makes sense as a start.
>
>
>> That's a lot of questions! :-) but hope i'm making sense.
>>
>> -Gautam.
>>
>>
>>
>> [1] -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Gautam <ga...@gmail.com>.
Also I think the other thing that's fundamentally different is the way Page
iteration and Column iteration are done in Iceberg vs. the way value
reading happens in Spark's ValuesReader implementations.

On Wed, Jul 31, 2019 at 1:44 PM Gautam <ga...@gmail.com> wrote:

> Hey Samarth,
>               Sorry bout the delay. I ran into some bottlenecks for which
> I had to add more code to be able to run benchmarks. I'v checked in my
> latest changes to my fork's *vectorized-read* branch [0].
>
> Here's the early numbers on the initial implementation...
>
> *Benchmark Data:*
> - 10 files
> - 9MB each
> - 1Millon rows (1 RowGroup)
>
> Ran benchmark using the jmh benchmark tool within incubator-iceberg/spark/src/jmh
> using batch different sizes and compared it to  spark's vectorization and
> non-vectorized reader.
>
> *Command: *
> ./gradlew clean   :iceberg-spark:jmh
>  -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark
> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt
>
>
>
> *Benchmark
>            Mode  Cnt   Score   Error  Units*
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>              ss    5  16.172 ± 0.750   s/op
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>             ss    5   6.430 ± 0.136   s/op
> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>              ss    5  15.287 ± 0.212   s/op
>
>
>
> *IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized100k
>              ss    5  18.310 ± 0.498
> s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized10k
>                 ss    5  18.020 ± 0.378
> s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k
>                ss    5  17.769 ± 0.412   s/op*IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>    ss    5   2.794 ± 0.141   s/op
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceVectorized
>       ss    5   1.063 ± 0.140   s/op
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>              ss    5   2.966 ± 0.133   s/op
>
>
> *IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized100k
>      ss    5   2.015 ± 0.261
> s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized10k
>       ss    5   1.972 ± 0.105
> s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized5k
>        ss    5   2.065 ± 0.079   s/op*
>
>
>
> So seems like there's no improvement  that vectorization is adding over
> the non-vectorized reading. I'm currently trying to profile where the time
> is being spent.
>
> *Here is my initial speculation of why this is slow:*
>  - There's too much overhead that seems to be from creating the batches.
> i'm creating new instance of ColumnarBatch on each read  [1] . This should
> prolly be re-used.
>  - Although I am reusing the *FieldVector* across batched reads [2] I
> wrap them in new *ArrowColumnVector*s [3]  on each read call. I didn't
> think this would be a big deal but maybe it is.
>  - The filters are not being applied in columnar fashion they are being
> applied row by row as in Iceberg each filter visitor is stateless and
> applied separately on each row's column.
>  - I'm trying to re-use the BufferAllocator that Arrow provides [4] ..
> Dunno if there are other strategies to using this. Will look more into this.
>  - I'm batching until the rowgroup ends and restricting the last batch to
> the Rowgroup boundary. I should prolly spill over to the next rowgroup to
> fill that batch. Dunno if this would help as from what i can tell I don't
> think *VectorizedParquetRecordReader *does this.
>
> I'l try and provide more insights once i improve my code. But if there's
> other insights folks have on where we can improve on things, i'd gladly try
> them.
>
> Cheers,
> - Gautam.
>
> [0] - https://github.com/prodeezy/incubator-iceberg/tree/vectorized-read
> [1] -
> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L655
> [2] -
> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java#L108
> [3] -
> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L651
> [4] -
> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java#L92
>
>
> On Tue, Jul 30, 2019 at 5:13 PM Samarth Jain <sa...@gmail.com>
> wrote:
>
>> Hey Gautam,
>>
>> Wanted to check back with you and see if you had any success running the
>> benchmark and if you have any numbers to share.
>>
>>
>>
>> On Fri, Jul 26, 2019 at 4:34 PM Gautam <ga...@gmail.com> wrote:
>>
>>> Got it. Commented out that module and it works. Was just curious why it
>>> doesn't work on master branch either.
>>>
>>> On Fri, Jul 26, 2019 at 3:49 PM Daniel Weeks <dw...@netflix.com> wrote:
>>>
>>>> Actually, it looks like the issue is right there in the error . . . the
>>>> ErrorProne module is being excluded from the compile stages of the
>>>> sub-projects here:
>>>> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L152
>>>>
>>>> However, it is still being applied to the jmh tasks.  I'm not familiar
>>>> with this module, but you can run the benchmarks by commenting it out here:
>>>> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L167
>>>>
>>>> We'll need to fix the build to disable for the jmh tasks.
>>>>
>>>> -Dan
>>>>
>>>> On Fri, Jul 26, 2019 at 3:34 PM Daniel Weeks <dw...@netflix.com>
>>>> wrote:
>>>>
>>>>> Gautam, you need to have the jmh-core libraries available to run.  I
>>>>> validated that PR, so I'm guessing I had it configured in my environment.
>>>>>
>>>>> I assume there's a way to make that available within gradle, so I'll
>>>>> take a look.
>>>>>
>>>>> On Fri, Jul 26, 2019 at 2:52 PM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> This fails on master too btw. Just wondering if i'm doing
>>>>>> something wrong trying to run this.
>>>>>>
>>>>>> On Fri, Jul 26, 2019 at 2:24 PM Gautam <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'v been trying to run the jmh benchmarks bundled within the
>>>>>>> project. I'v been running into issues with that .. have other hit this? Am
>>>>>>> I running these incorrectly?
>>>>>>>
>>>>>>>
>>>>>>> bash-3.2$ ./gradlew :iceberg-spark:jmh
>>>>>>> -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
>>>>>>> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
>>>>>>> ..
>>>>>>> ...
>>>>>>> > Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
>>>>>>> error: plug-in not found: ErrorProne
>>>>>>>
>>>>>>> FAILURE: Build failed with an exception.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Is there a config/plugin I need to add to build.gradle?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>>
>>>>>>>> Thanks Gautam!
>>>>>>>>
>>>>>>>> We'll start taking a look at your code. What do you think about
>>>>>>>> creating a branch in the Iceberg repository where we can work on improving
>>>>>>>> it together, before merging it into master?
>>>>>>>>
>>>>>>>> Also, you mentioned performance comparisons. Do you have any early
>>>>>>>> results to share?
>>>>>>>>
>>>>>>>> rb
>>>>>>>>
>>>>>>>> On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello Folks,
>>>>>>>>>
>>>>>>>>> I have checked in a WIP branch [1] with a working version of
>>>>>>>>> Vectorized reads for Iceberg reader. Here's the diff  [2].
>>>>>>>>>
>>>>>>>>> *Implementation Notes:*
>>>>>>>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to
>>>>>>>>> instruct the DataSourceV2ScanExec to use `planBatchPartitions()` instead of
>>>>>>>>> the usual `planInputPartitions()`. It returns instances of `ColumnarBatch`
>>>>>>>>> on each iteration.
>>>>>>>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion.
>>>>>>>>> This was copied from [3] . Added by @Daniel Weeks
>>>>>>>>> <dw...@netflix.com> . Thanks for that!
>>>>>>>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders
>>>>>>>>> used for reading/decoding the Parquet rowgroups (aka pagestores as referred
>>>>>>>>> to in the code)
>>>>>>>>>  - `VectorizedSparkParquetReaders` contains the visitor
>>>>>>>>> implementations to map Parquet types to appropriate value readers. I
>>>>>>>>> implemented the struct visitor so that the root schema can be mapped
>>>>>>>>> properly. This has the added benefit of vectorization support for structs,
>>>>>>>>> so yay!
>>>>>>>>>  - For the initial version the value readers read an entire row
>>>>>>>>> group into a single Arrow Field Vector. this i'd imagine will require
>>>>>>>>> tuning for right batch sizing but i'v gone with one batch per rowgroup for
>>>>>>>>> now.
>>>>>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which
>>>>>>>>> is Spark's ColumnVector implementation backed by Arrow. This is the first
>>>>>>>>> contact point between Spark and Arrow interfaces.
>>>>>>>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch`
>>>>>>>>> by `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>>>>>>>> which maps Structs to Columnar Batches. This allows us to have nested
>>>>>>>>> structs where each level of nesting would be a nested columnar batch. Lemme
>>>>>>>>> know what you think of this approach.
>>>>>>>>>  - I'v added value readers for all supported primitive types
>>>>>>>>> listed in `AvroDataTest`. There's a corresponding test for vectorized
>>>>>>>>> reader under `TestSparkParquetVectorizedReader`
>>>>>>>>>  - I haven't fixed all the Checkstyle errors so you will have to
>>>>>>>>> turn checkstyle off in build.gradle. Also skip tests while building..
>>>>>>>>> sorry! :-(
>>>>>>>>>
>>>>>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore
>>>>>>>>> this as it's not used. This was from my previous impl of Vectorization. I'v
>>>>>>>>> kept it around to compare performance.
>>>>>>>>>
>>>>>>>>> Lemme know what folks think of the approach. I'm getting this
>>>>>>>>> working for our scale test benchmark and will report back with numbers.
>>>>>>>>> Feel free to run your own benchmarks and share.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> -Gautam.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1] -
>>>>>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>>>>>> [2] -
>>>>>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>>>>>> [3] -
>>>>>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Will do. Doing a bit of housekeeping on the code and also adding
>>>>>>>>>> more primitive type support.
>>>>>>>>>>
>>>>>>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Would it be possible to put the work in progress code in open
>>>>>>>>>>> source?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *From: *Gautam <ga...@gmail.com>
>>>>>>>>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>>>>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>>>>>>> *To: *Daniel Weeks <dw...@netflix.com>
>>>>>>>>>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>>>>>>>>>> dev@iceberg.apache.org>
>>>>>>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> That would be great!
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hey Gautam,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We also have a couple people looking into vectorized reading
>>>>>>>>>>> (into Arrow memory).  I think it would be good for us to get together and
>>>>>>>>>>> see if we can collaborate on a common approach for this.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I'll reach out directly and see if we can get together.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Dan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Figured this out. I'm returning ColumnarBatch iterator directly
>>>>>>>>>>> without projection with schema set appropriately in `readSchema() `.. the
>>>>>>>>>>> empty result was due to valuesRead not being set correctly on FileIterator.
>>>>>>>>>>> Did that and things are working. Will circle back with numbers soon.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hey Guys,
>>>>>>>>>>>
>>>>>>>>>>>            Sorry bout the delay on this. Just got back on
>>>>>>>>>>> getting a basic working implementation in Iceberg for Vectorization on
>>>>>>>>>>> primitive types.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Here's what I have so far :  *
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I have added `ParquetValueReader` implementations for some basic
>>>>>>>>>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>>>>>>>>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>>>>>>>>>> value vector reader there are column iterators that read from the parquet
>>>>>>>>>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>>>>>>>>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>>>>>>>>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>>>>>>>>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>>>>>>>>>> work properly with the underlying interfaces.  I'v also made changes to
>>>>>>>>>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>>>>>>>>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>>>>>>>>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>>>>>>>>>> planning runtime works fine with these changes.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Although it fails during query execution, the bit it's
>>>>>>>>>>> currently failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>>>>>>>>> [github.com]
>>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This code, I think,  tries to apply the iterator's schema
>>>>>>>>>>> projection on the InternalRow instances. This seems to be tightly coupled
>>>>>>>>>>> to InternalRow as Spark's catalyst expressions have implemented the
>>>>>>>>>>> UnsafeProjection for InternalRow only. If I take this out and just return
>>>>>>>>>>> the `Iterator<ColumnarBatch>` iterator I built it returns empty result on
>>>>>>>>>>> the client. I'm guessing this is coz Spark is unaware of the iterator's
>>>>>>>>>>> schema? There's a Todo in the code that says "*remove the
>>>>>>>>>>> projection by reporting the iterator's schema back to Spark*".
>>>>>>>>>>> Is there a simple way to communicate that to Spark for my new iterator? Any
>>>>>>>>>>> pointers on how to get around this?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks and Regards,
>>>>>>>>>>>
>>>>>>>>>>> -Gautam.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Replies inline.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thanks for responding Ryan,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I'd like to start with testing Arrow out with readers for
>>>>>>>>>>> primitive type and incrementally add in Struct/Array support, also
>>>>>>>>>>> ArrowWriter [1] currently doesn't have converters for map type. How can I
>>>>>>>>>>> default these types to regular materialization whilst supporting Arrow
>>>>>>>>>>> based support for primitives?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We should look at what Spark does to handle maps.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I think we should get the prototype working with test cases that
>>>>>>>>>>> don't have maps, structs, or lists. Just getting primitives working is a
>>>>>>>>>>> good start and just won't hit these problems.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Lemme know if this makes sense...
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive
>>>>>>>>>>> types into ArrowColumnVectors of corresponding column types by iterating
>>>>>>>>>>> over underlying ColumnIterator *n times*, where n is size of
>>>>>>>>>>> batch.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Sounds good to me. I'm not sure about extending vs wrapping
>>>>>>>>>>> because I'm not too familiar with the Arrow APIs.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - Reader.newParquetIterable()  maps primitive column types to
>>>>>>>>>>> the newly added ArrowParquetValueReader but for other types (nested types,
>>>>>>>>>>> etc.) uses current *InternalRow* based ValueReaders
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Sounds good for primitives, but I would just leave the nested
>>>>>>>>>>> types un-implemented for now.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - Stitch the columns vectors together to create ColumnarBatch,
>>>>>>>>>>> (Since *SupportsScanColumnarBatch* mixin currently expects this
>>>>>>>>>>> ) .. *although* *I'm a bit lost on how the stitching of columns
>>>>>>>>>>> happens currently*? .. and how the ArrowColumnVectors could  be
>>>>>>>>>>> stitched alongside regular columns that don't have arrow based support ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I don't think that you can mix regular columns and Arrow
>>>>>>>>>>> columns. It has to be all one or the other. That's why it's easier to start
>>>>>>>>>>> with primitives, then add structs, then lists, and finally maps.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*>
>>>>>>>>>>> *so that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We will probably need two paths. One for columnar batches and
>>>>>>>>>>> one for row-based reads. That doesn't need to be done right away and what
>>>>>>>>>>> you already have in your working copy makes sense as a start.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Gautam.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>>>>>>>> [github.com]
>>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>
>>>>>>>>>>> Software Engineer
>>>>>>>>>>>
>>>>>>>>>>> Netflix
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Ryan Blue
>>>>>>>> Software Engineer
>>>>>>>> Netflix
>>>>>>>>
>>>>>>>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Gautam <ga...@gmail.com>.
Ah yes, I didn't send over the filter benchmarks ..

Num files : 500
Num rows per file: 10,000

*Benchmark
         Mode  Cnt  Score   Error  Units*
IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized
   ss    5  3.837 ± 0.424   s/op
IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized
      ss    5  3.964 ± 1.891   s/op
IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIceberg
         ss    5  0.272 ± 0.039   s/op
IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergVect100k
          ss    5  0.274 ± 0.013   s/op
IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergVect10k
         ss    5  0.275 ± 0.040   s/op
IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergVect5k
          ss    5  0.273 ± 0.031   s/op

On Wed, Jul 31, 2019 at 2:35 PM Anjali Norwood <an...@netflix.com.invalid>
wrote:

> Hi Gautam,
>
> You wrote: ' - The filters are not being applied in columnar fashion they
> are being applied row by row as in Iceberg each filter visitor is stateless
> and applied separately on each row's column. ' .. this should not be a
> problem for this particular benchmark as IcebergSourceFlatParquetDataRe
> adBenchmark does not apply filters.
>
> -Anjali.
>
> On Wed, Jul 31, 2019 at 1:44 PM Gautam <ga...@gmail.com> wrote:
>
>> Hey Samarth,
>>               Sorry bout the delay. I ran into some bottlenecks for which
>> I had to add more code to be able to run benchmarks. I'v checked in my
>> latest changes to my fork's *vectorized-read* branch [0].
>>
>> Here's the early numbers on the initial implementation...
>>
>> *Benchmark Data:*
>> - 10 files
>> - 9MB each
>> - 1Millon rows (1 RowGroup)
>>
>> Ran benchmark using the jmh benchmark tool within incubator-iceberg/spark/src/jmh
>> using batch different sizes and compared it to  spark's vectorization and
>> non-vectorized reader.
>>
>> *Command: *
>> ./gradlew clean   :iceberg-spark:jmh
>>  -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark
>> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt
>>
>>
>>
>> *Benchmark
>>              Mode  Cnt   Score   Error  Units*
>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>>              ss    5  16.172 ± 0.750   s/op
>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>>               ss    5   6.430 ± 0.136   s/op
>> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>>              ss    5  15.287 ± 0.212   s/op
>>
>>
>>
>> *IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized100k
>>                ss    5  18.310 ± 0.498
>> s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized10k
>>                 ss    5  18.020 ± 0.378
>> s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k
>>                ss    5  17.769 ± 0.412   s/op*IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>>    ss    5   2.794 ± 0.141   s/op
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceVectorized
>>       ss    5   1.063 ± 0.140   s/op
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>>              ss    5   2.966 ± 0.133   s/op
>>
>>
>> *IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized100k
>>      ss    5   2.015 ± 0.261
>> s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized10k
>>       ss    5   1.972 ± 0.105
>> s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized5k
>>        ss    5   2.065 ± 0.079   s/op*
>>
>>
>>
>> So seems like there's no improvement  that vectorization is adding over
>> the non-vectorized reading. I'm currently trying to profile where the time
>> is being spent.
>>
>> *Here is my initial speculation of why this is slow:*
>>  - There's too much overhead that seems to be from creating the batches.
>> i'm creating new instance of ColumnarBatch on each read  [1] . This should
>> prolly be re-used.
>>  - Although I am reusing the *FieldVector* across batched reads [2] I
>> wrap them in new *ArrowColumnVector*s [3]  on each read call. I didn't
>> think this would be a big deal but maybe it is.
>>  - The filters are not being applied in columnar fashion they are being
>> applied row by row as in Iceberg each filter visitor is stateless and
>> applied separately on each row's column.
>>  - I'm trying to re-use the BufferAllocator that Arrow provides [4] ..
>> Dunno if there are other strategies to using this. Will look more into this.
>>  - I'm batching until the rowgroup ends and restricting the last batch to
>> the Rowgroup boundary. I should prolly spill over to the next rowgroup to
>> fill that batch. Dunno if this would help as from what i can tell I don't
>> think *VectorizedParquetRecordReader *does this.
>>
>> I'l try and provide more insights once i improve my code. But if there's
>> other insights folks have on where we can improve on things, i'd gladly try
>> them.
>>
>> Cheers,
>> - Gautam.
>>
>> [0] - https://github.com/prodeezy/incubator-iceberg/tree/vectorized-read
>> [1] -
>> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L655
>> [2] -
>> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java#L108
>> [3] -
>> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L651
>> [4] -
>> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java#L92
>>
>>
>> On Tue, Jul 30, 2019 at 5:13 PM Samarth Jain <sa...@gmail.com>
>> wrote:
>>
>>> Hey Gautam,
>>>
>>> Wanted to check back with you and see if you had any success running the
>>> benchmark and if you have any numbers to share.
>>>
>>>
>>>
>>> On Fri, Jul 26, 2019 at 4:34 PM Gautam <ga...@gmail.com> wrote:
>>>
>>>> Got it. Commented out that module and it works. Was just curious why it
>>>> doesn't work on master branch either.
>>>>
>>>> On Fri, Jul 26, 2019 at 3:49 PM Daniel Weeks <dw...@netflix.com>
>>>> wrote:
>>>>
>>>>> Actually, it looks like the issue is right there in the error . . .
>>>>> the ErrorProne module is being excluded from the compile stages of the
>>>>> sub-projects here:
>>>>> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L152
>>>>>
>>>>> However, it is still being applied to the jmh tasks.  I'm not familiar
>>>>> with this module, but you can run the benchmarks by commenting it out here:
>>>>> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L167
>>>>>
>>>>> We'll need to fix the build to disable for the jmh tasks.
>>>>>
>>>>> -Dan
>>>>>
>>>>> On Fri, Jul 26, 2019 at 3:34 PM Daniel Weeks <dw...@netflix.com>
>>>>> wrote:
>>>>>
>>>>>> Gautam, you need to have the jmh-core libraries available to run.  I
>>>>>> validated that PR, so I'm guessing I had it configured in my environment.
>>>>>>
>>>>>> I assume there's a way to make that available within gradle, so I'll
>>>>>> take a look.
>>>>>>
>>>>>> On Fri, Jul 26, 2019 at 2:52 PM Gautam <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> This fails on master too btw. Just wondering if i'm doing
>>>>>>> something wrong trying to run this.
>>>>>>>
>>>>>>> On Fri, Jul 26, 2019 at 2:24 PM Gautam <ga...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'v been trying to run the jmh benchmarks bundled within the
>>>>>>>> project. I'v been running into issues with that .. have other hit this? Am
>>>>>>>> I running these incorrectly?
>>>>>>>>
>>>>>>>>
>>>>>>>> bash-3.2$ ./gradlew :iceberg-spark:jmh
>>>>>>>> -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
>>>>>>>> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
>>>>>>>> ..
>>>>>>>> ...
>>>>>>>> > Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
>>>>>>>> error: plug-in not found: ErrorProne
>>>>>>>>
>>>>>>>> FAILURE: Build failed with an exception.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Is there a config/plugin I need to add to build.gradle?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue <rb...@netflix.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Gautam!
>>>>>>>>>
>>>>>>>>> We'll start taking a look at your code. What do you think about
>>>>>>>>> creating a branch in the Iceberg repository where we can work on improving
>>>>>>>>> it together, before merging it into master?
>>>>>>>>>
>>>>>>>>> Also, you mentioned performance comparisons. Do you have any early
>>>>>>>>> results to share?
>>>>>>>>>
>>>>>>>>> rb
>>>>>>>>>
>>>>>>>>> On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Folks,
>>>>>>>>>>
>>>>>>>>>> I have checked in a WIP branch [1] with a working version of
>>>>>>>>>> Vectorized reads for Iceberg reader. Here's the diff  [2].
>>>>>>>>>>
>>>>>>>>>> *Implementation Notes:*
>>>>>>>>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to
>>>>>>>>>> instruct the DataSourceV2ScanExec to use `planBatchPartitions()` instead of
>>>>>>>>>> the usual `planInputPartitions()`. It returns instances of `ColumnarBatch`
>>>>>>>>>> on each iteration.
>>>>>>>>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion.
>>>>>>>>>> This was copied from [3] . Added by @Daniel Weeks
>>>>>>>>>> <dw...@netflix.com> . Thanks for that!
>>>>>>>>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders
>>>>>>>>>> used for reading/decoding the Parquet rowgroups (aka pagestores as referred
>>>>>>>>>> to in the code)
>>>>>>>>>>  - `VectorizedSparkParquetReaders` contains the visitor
>>>>>>>>>> implementations to map Parquet types to appropriate value readers. I
>>>>>>>>>> implemented the struct visitor so that the root schema can be mapped
>>>>>>>>>> properly. This has the added benefit of vectorization support for structs,
>>>>>>>>>> so yay!
>>>>>>>>>>  - For the initial version the value readers read an entire row
>>>>>>>>>> group into a single Arrow Field Vector. this i'd imagine will require
>>>>>>>>>> tuning for right batch sizing but i'v gone with one batch per rowgroup for
>>>>>>>>>> now.
>>>>>>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector`
>>>>>>>>>> which is Spark's ColumnVector implementation backed by Arrow. This is the
>>>>>>>>>> first contact point between Spark and Arrow interfaces.
>>>>>>>>>>  - ArrowColumnVectors are stitched together into a
>>>>>>>>>> `ColumnarBatch` by `ColumnarBatchReader` . This is my replacement for
>>>>>>>>>> `InternalRowReader` which maps Structs to Columnar Batches. This allows us
>>>>>>>>>> to have nested structs where each level of nesting would be a nested
>>>>>>>>>> columnar batch. Lemme know what you think of this approach.
>>>>>>>>>>  - I'v added value readers for all supported primitive types
>>>>>>>>>> listed in `AvroDataTest`. There's a corresponding test for vectorized
>>>>>>>>>> reader under `TestSparkParquetVectorizedReader`
>>>>>>>>>>  - I haven't fixed all the Checkstyle errors so you will have to
>>>>>>>>>> turn checkstyle off in build.gradle. Also skip tests while building..
>>>>>>>>>> sorry! :-(
>>>>>>>>>>
>>>>>>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore
>>>>>>>>>> this as it's not used. This was from my previous impl of Vectorization. I'v
>>>>>>>>>> kept it around to compare performance.
>>>>>>>>>>
>>>>>>>>>> Lemme know what folks think of the approach. I'm getting this
>>>>>>>>>> working for our scale test benchmark and will report back with numbers.
>>>>>>>>>> Feel free to run your own benchmarks and share.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> -Gautam.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [1] -
>>>>>>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>>>>>>> [2] -
>>>>>>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>>>>>>> [3] -
>>>>>>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Will do. Doing a bit of housekeeping on the code and also adding
>>>>>>>>>>> more primitive type support.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Would it be possible to put the work in progress code in open
>>>>>>>>>>>> source?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *From: *Gautam <ga...@gmail.com>
>>>>>>>>>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>>>>>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>>>>>>>> *To: *Daniel Weeks <dw...@netflix.com>
>>>>>>>>>>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>>>>>>>>>>> dev@iceberg.apache.org>
>>>>>>>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> That would be great!
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <
>>>>>>>>>>>> dweeks@netflix.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hey Gautam,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> We also have a couple people looking into vectorized reading
>>>>>>>>>>>> (into Arrow memory).  I think it would be good for us to get together and
>>>>>>>>>>>> see if we can collaborate on a common approach for this.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I'll reach out directly and see if we can get together.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Dan
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <
>>>>>>>>>>>> gautamkowshik@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Figured this out. I'm returning ColumnarBatch iterator directly
>>>>>>>>>>>> without projection with schema set appropriately in `readSchema() `.. the
>>>>>>>>>>>> empty result was due to valuesRead not being set correctly on FileIterator.
>>>>>>>>>>>> Did that and things are working. Will circle back with numbers soon.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hey Guys,
>>>>>>>>>>>>
>>>>>>>>>>>>            Sorry bout the delay on this. Just got back on
>>>>>>>>>>>> getting a basic working implementation in Iceberg for Vectorization on
>>>>>>>>>>>> primitive types.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *Here's what I have so far :  *
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I have added `ParquetValueReader` implementations for some
>>>>>>>>>>>> basic primitive types that build the respective Arrow Vector
>>>>>>>>>>>> (`ValueVector`) viz. `IntVector` for int, `VarCharVector` for strings and
>>>>>>>>>>>> so on. Underneath each value vector reader there are column iterators that
>>>>>>>>>>>> read from the parquet pagestores (rowgroups) in chunks. These
>>>>>>>>>>>> `ValueVector-s` are lined up as `ArrowColumnVector`-s (which is
>>>>>>>>>>>> ColumnVector wrapper backed by Arrow) and stitched together using a
>>>>>>>>>>>> `ColumnarBatchReader` (which as the name suggests wraps ColumnarBatches in
>>>>>>>>>>>> the iterator)   I'v verified that these pieces work properly with the
>>>>>>>>>>>> underlying interfaces.  I'v also made changes to Iceberg's `Reader` to
>>>>>>>>>>>> implement `planBatchPartitions()` (to add the `SupportsScanColumnarBatch`
>>>>>>>>>>>> mixin to the reader).  So the reader now expects ColumnarBatch instances
>>>>>>>>>>>> (instead of InternalRow). The query planning runtime works fine with these
>>>>>>>>>>>> changes.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Although it fails during query execution, the bit it's
>>>>>>>>>>>> currently failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>>>>>>>>>> [github.com]
>>>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This code, I think,  tries to apply the iterator's schema
>>>>>>>>>>>> projection on the InternalRow instances. This seems to be tightly coupled
>>>>>>>>>>>> to InternalRow as Spark's catalyst expressions have implemented the
>>>>>>>>>>>> UnsafeProjection for InternalRow only. If I take this out and just return
>>>>>>>>>>>> the `Iterator<ColumnarBatch>` iterator I built it returns empty result on
>>>>>>>>>>>> the client. I'm guessing this is coz Spark is unaware of the iterator's
>>>>>>>>>>>> schema? There's a Todo in the code that says "*remove the
>>>>>>>>>>>> projection by reporting the iterator's schema back to Spark*".
>>>>>>>>>>>> Is there a simple way to communicate that to Spark for my new iterator? Any
>>>>>>>>>>>> pointers on how to get around this?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks and Regards,
>>>>>>>>>>>>
>>>>>>>>>>>> -Gautam.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Replies inline.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for responding Ryan,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I'd like to start with testing Arrow out with readers for
>>>>>>>>>>>> primitive type and incrementally add in Struct/Array support, also
>>>>>>>>>>>> ArrowWriter [1] currently doesn't have converters for map type. How can I
>>>>>>>>>>>> default these types to regular materialization whilst supporting Arrow
>>>>>>>>>>>> based support for primitives?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> We should look at what Spark does to handle maps.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I think we should get the prototype working with test cases
>>>>>>>>>>>> that don't have maps, structs, or lists. Just getting primitives working is
>>>>>>>>>>>> a good start and just won't hit these problems.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Lemme know if this makes sense...
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive
>>>>>>>>>>>> types into ArrowColumnVectors of corresponding column types by iterating
>>>>>>>>>>>> over underlying ColumnIterator *n times*, where n is size of
>>>>>>>>>>>> batch.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Sounds good to me. I'm not sure about extending vs wrapping
>>>>>>>>>>>> because I'm not too familiar with the Arrow APIs.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> - Reader.newParquetIterable()  maps primitive column types to
>>>>>>>>>>>> the newly added ArrowParquetValueReader but for other types (nested types,
>>>>>>>>>>>> etc.) uses current *InternalRow* based ValueReaders
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Sounds good for primitives, but I would just leave the nested
>>>>>>>>>>>> types un-implemented for now.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> - Stitch the columns vectors together to create ColumnarBatch,
>>>>>>>>>>>> (Since *SupportsScanColumnarBatch* mixin currently expects
>>>>>>>>>>>> this ) .. *although* *I'm a bit lost on how the stitching of
>>>>>>>>>>>> columns happens currently*? .. and how the ArrowColumnVectors
>>>>>>>>>>>> could  be stitched alongside regular columns that don't have arrow based
>>>>>>>>>>>> support ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I don't think that you can mix regular columns and Arrow
>>>>>>>>>>>> columns. It has to be all one or the other. That's why it's easier to start
>>>>>>>>>>>> with primitives, then add structs, then lists, and finally maps.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*>
>>>>>>>>>>>> *so that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> We will probably need two paths. One for columnar batches and
>>>>>>>>>>>> one for row-based reads. That doesn't need to be done right away and what
>>>>>>>>>>>> you already have in your working copy makes sense as a start.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Gautam.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>>>>>>>>> [github.com]
>>>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>>
>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>>
>>>>>>>>>>>> Netflix
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Ryan Blue
>>>>>>>>> Software Engineer
>>>>>>>>> Netflix
>>>>>>>>>
>>>>>>>>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Anjali Norwood <an...@netflix.com.INVALID>.
Hi Gautam,

You wrote: ' - The filters are not being applied in columnar fashion they
are being applied row by row as in Iceberg each filter visitor is stateless
and applied separately on each row's column. ' .. this should not be a
problem for this particular benchmark as IcebergSourceFlatParquetDataRe
adBenchmark does not apply filters.

-Anjali.

On Wed, Jul 31, 2019 at 1:44 PM Gautam <ga...@gmail.com> wrote:

> Hey Samarth,
>               Sorry bout the delay. I ran into some bottlenecks for which
> I had to add more code to be able to run benchmarks. I'v checked in my
> latest changes to my fork's *vectorized-read* branch [0].
>
> Here's the early numbers on the initial implementation...
>
> *Benchmark Data:*
> - 10 files
> - 9MB each
> - 1Millon rows (1 RowGroup)
>
> Ran benchmark using the jmh benchmark tool within incubator-iceberg/spark/src/jmh
> using batch different sizes and compared it to  spark's vectorization and
> non-vectorized reader.
>
> *Command: *
> ./gradlew clean   :iceberg-spark:jmh
>  -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark
> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt
>
>
>
> *Benchmark
>            Mode  Cnt   Score   Error  Units*
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>              ss    5  16.172 ± 0.750   s/op
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>             ss    5   6.430 ± 0.136   s/op
> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>              ss    5  15.287 ± 0.212   s/op
>
>
>
> *IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized100k
>              ss    5  18.310 ± 0.498
> s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized10k
>                 ss    5  18.020 ± 0.378
> s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k
>                ss    5  17.769 ± 0.412   s/op*IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>    ss    5   2.794 ± 0.141   s/op
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceVectorized
>       ss    5   1.063 ± 0.140   s/op
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>              ss    5   2.966 ± 0.133   s/op
>
>
> *IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized100k
>      ss    5   2.015 ± 0.261
> s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized10k
>       ss    5   1.972 ± 0.105
> s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized5k
>        ss    5   2.065 ± 0.079   s/op*
>
>
>
> So seems like there's no improvement  that vectorization is adding over
> the non-vectorized reading. I'm currently trying to profile where the time
> is being spent.
>
> *Here is my initial speculation of why this is slow:*
>  - There's too much overhead that seems to be from creating the batches.
> i'm creating new instance of ColumnarBatch on each read  [1] . This should
> prolly be re-used.
>  - Although I am reusing the *FieldVector* across batched reads [2] I
> wrap them in new *ArrowColumnVector*s [3]  on each read call. I didn't
> think this would be a big deal but maybe it is.
>  - The filters are not being applied in columnar fashion they are being
> applied row by row as in Iceberg each filter visitor is stateless and
> applied separately on each row's column.
>  - I'm trying to re-use the BufferAllocator that Arrow provides [4] ..
> Dunno if there are other strategies to using this. Will look more into this.
>  - I'm batching until the rowgroup ends and restricting the last batch to
> the Rowgroup boundary. I should prolly spill over to the next rowgroup to
> fill that batch. Dunno if this would help as from what i can tell I don't
> think *VectorizedParquetRecordReader *does this.
>
> I'l try and provide more insights once i improve my code. But if there's
> other insights folks have on where we can improve on things, i'd gladly try
> them.
>
> Cheers,
> - Gautam.
>
> [0] - https://github.com/prodeezy/incubator-iceberg/tree/vectorized-read
> [1] -
> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L655
> [2] -
> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java#L108
> [3] -
> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L651
> [4] -
> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java#L92
>
>
> On Tue, Jul 30, 2019 at 5:13 PM Samarth Jain <sa...@gmail.com>
> wrote:
>
>> Hey Gautam,
>>
>> Wanted to check back with you and see if you had any success running the
>> benchmark and if you have any numbers to share.
>>
>>
>>
>> On Fri, Jul 26, 2019 at 4:34 PM Gautam <ga...@gmail.com> wrote:
>>
>>> Got it. Commented out that module and it works. Was just curious why it
>>> doesn't work on master branch either.
>>>
>>> On Fri, Jul 26, 2019 at 3:49 PM Daniel Weeks <dw...@netflix.com> wrote:
>>>
>>>> Actually, it looks like the issue is right there in the error . . . the
>>>> ErrorProne module is being excluded from the compile stages of the
>>>> sub-projects here:
>>>> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L152
>>>>
>>>> However, it is still being applied to the jmh tasks.  I'm not familiar
>>>> with this module, but you can run the benchmarks by commenting it out here:
>>>> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L167
>>>>
>>>> We'll need to fix the build to disable for the jmh tasks.
>>>>
>>>> -Dan
>>>>
>>>> On Fri, Jul 26, 2019 at 3:34 PM Daniel Weeks <dw...@netflix.com>
>>>> wrote:
>>>>
>>>>> Gautam, you need to have the jmh-core libraries available to run.  I
>>>>> validated that PR, so I'm guessing I had it configured in my environment.
>>>>>
>>>>> I assume there's a way to make that available within gradle, so I'll
>>>>> take a look.
>>>>>
>>>>> On Fri, Jul 26, 2019 at 2:52 PM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> This fails on master too btw. Just wondering if i'm doing
>>>>>> something wrong trying to run this.
>>>>>>
>>>>>> On Fri, Jul 26, 2019 at 2:24 PM Gautam <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'v been trying to run the jmh benchmarks bundled within the
>>>>>>> project. I'v been running into issues with that .. have other hit this? Am
>>>>>>> I running these incorrectly?
>>>>>>>
>>>>>>>
>>>>>>> bash-3.2$ ./gradlew :iceberg-spark:jmh
>>>>>>> -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
>>>>>>> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
>>>>>>> ..
>>>>>>> ...
>>>>>>> > Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
>>>>>>> error: plug-in not found: ErrorProne
>>>>>>>
>>>>>>> FAILURE: Build failed with an exception.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Is there a config/plugin I need to add to build.gradle?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>>
>>>>>>>> Thanks Gautam!
>>>>>>>>
>>>>>>>> We'll start taking a look at your code. What do you think about
>>>>>>>> creating a branch in the Iceberg repository where we can work on improving
>>>>>>>> it together, before merging it into master?
>>>>>>>>
>>>>>>>> Also, you mentioned performance comparisons. Do you have any early
>>>>>>>> results to share?
>>>>>>>>
>>>>>>>> rb
>>>>>>>>
>>>>>>>> On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello Folks,
>>>>>>>>>
>>>>>>>>> I have checked in a WIP branch [1] with a working version of
>>>>>>>>> Vectorized reads for Iceberg reader. Here's the diff  [2].
>>>>>>>>>
>>>>>>>>> *Implementation Notes:*
>>>>>>>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to
>>>>>>>>> instruct the DataSourceV2ScanExec to use `planBatchPartitions()` instead of
>>>>>>>>> the usual `planInputPartitions()`. It returns instances of `ColumnarBatch`
>>>>>>>>> on each iteration.
>>>>>>>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion.
>>>>>>>>> This was copied from [3] . Added by @Daniel Weeks
>>>>>>>>> <dw...@netflix.com> . Thanks for that!
>>>>>>>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders
>>>>>>>>> used for reading/decoding the Parquet rowgroups (aka pagestores as referred
>>>>>>>>> to in the code)
>>>>>>>>>  - `VectorizedSparkParquetReaders` contains the visitor
>>>>>>>>> implementations to map Parquet types to appropriate value readers. I
>>>>>>>>> implemented the struct visitor so that the root schema can be mapped
>>>>>>>>> properly. This has the added benefit of vectorization support for structs,
>>>>>>>>> so yay!
>>>>>>>>>  - For the initial version the value readers read an entire row
>>>>>>>>> group into a single Arrow Field Vector. this i'd imagine will require
>>>>>>>>> tuning for right batch sizing but i'v gone with one batch per rowgroup for
>>>>>>>>> now.
>>>>>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which
>>>>>>>>> is Spark's ColumnVector implementation backed by Arrow. This is the first
>>>>>>>>> contact point between Spark and Arrow interfaces.
>>>>>>>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch`
>>>>>>>>> by `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>>>>>>>> which maps Structs to Columnar Batches. This allows us to have nested
>>>>>>>>> structs where each level of nesting would be a nested columnar batch. Lemme
>>>>>>>>> know what you think of this approach.
>>>>>>>>>  - I'v added value readers for all supported primitive types
>>>>>>>>> listed in `AvroDataTest`. There's a corresponding test for vectorized
>>>>>>>>> reader under `TestSparkParquetVectorizedReader`
>>>>>>>>>  - I haven't fixed all the Checkstyle errors so you will have to
>>>>>>>>> turn checkstyle off in build.gradle. Also skip tests while building..
>>>>>>>>> sorry! :-(
>>>>>>>>>
>>>>>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore
>>>>>>>>> this as it's not used. This was from my previous impl of Vectorization. I'v
>>>>>>>>> kept it around to compare performance.
>>>>>>>>>
>>>>>>>>> Lemme know what folks think of the approach. I'm getting this
>>>>>>>>> working for our scale test benchmark and will report back with numbers.
>>>>>>>>> Feel free to run your own benchmarks and share.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> -Gautam.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1] -
>>>>>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>>>>>> [2] -
>>>>>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>>>>>> [3] -
>>>>>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Will do. Doing a bit of housekeeping on the code and also adding
>>>>>>>>>> more primitive type support.
>>>>>>>>>>
>>>>>>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Would it be possible to put the work in progress code in open
>>>>>>>>>>> source?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *From: *Gautam <ga...@gmail.com>
>>>>>>>>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>>>>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>>>>>>> *To: *Daniel Weeks <dw...@netflix.com>
>>>>>>>>>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>>>>>>>>>> dev@iceberg.apache.org>
>>>>>>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> That would be great!
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hey Gautam,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We also have a couple people looking into vectorized reading
>>>>>>>>>>> (into Arrow memory).  I think it would be good for us to get together and
>>>>>>>>>>> see if we can collaborate on a common approach for this.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I'll reach out directly and see if we can get together.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Dan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Figured this out. I'm returning ColumnarBatch iterator directly
>>>>>>>>>>> without projection with schema set appropriately in `readSchema() `.. the
>>>>>>>>>>> empty result was due to valuesRead not being set correctly on FileIterator.
>>>>>>>>>>> Did that and things are working. Will circle back with numbers soon.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hey Guys,
>>>>>>>>>>>
>>>>>>>>>>>            Sorry bout the delay on this. Just got back on
>>>>>>>>>>> getting a basic working implementation in Iceberg for Vectorization on
>>>>>>>>>>> primitive types.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Here's what I have so far :  *
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I have added `ParquetValueReader` implementations for some basic
>>>>>>>>>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>>>>>>>>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>>>>>>>>>> value vector reader there are column iterators that read from the parquet
>>>>>>>>>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>>>>>>>>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>>>>>>>>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>>>>>>>>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>>>>>>>>>> work properly with the underlying interfaces.  I'v also made changes to
>>>>>>>>>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>>>>>>>>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>>>>>>>>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>>>>>>>>>> planning runtime works fine with these changes.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Although it fails during query execution, the bit it's
>>>>>>>>>>> currently failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>>>>>>>>> [github.com]
>>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This code, I think,  tries to apply the iterator's schema
>>>>>>>>>>> projection on the InternalRow instances. This seems to be tightly coupled
>>>>>>>>>>> to InternalRow as Spark's catalyst expressions have implemented the
>>>>>>>>>>> UnsafeProjection for InternalRow only. If I take this out and just return
>>>>>>>>>>> the `Iterator<ColumnarBatch>` iterator I built it returns empty result on
>>>>>>>>>>> the client. I'm guessing this is coz Spark is unaware of the iterator's
>>>>>>>>>>> schema? There's a Todo in the code that says "*remove the
>>>>>>>>>>> projection by reporting the iterator's schema back to Spark*".
>>>>>>>>>>> Is there a simple way to communicate that to Spark for my new iterator? Any
>>>>>>>>>>> pointers on how to get around this?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks and Regards,
>>>>>>>>>>>
>>>>>>>>>>> -Gautam.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Replies inline.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thanks for responding Ryan,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I'd like to start with testing Arrow out with readers for
>>>>>>>>>>> primitive type and incrementally add in Struct/Array support, also
>>>>>>>>>>> ArrowWriter [1] currently doesn't have converters for map type. How can I
>>>>>>>>>>> default these types to regular materialization whilst supporting Arrow
>>>>>>>>>>> based support for primitives?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We should look at what Spark does to handle maps.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I think we should get the prototype working with test cases that
>>>>>>>>>>> don't have maps, structs, or lists. Just getting primitives working is a
>>>>>>>>>>> good start and just won't hit these problems.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Lemme know if this makes sense...
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive
>>>>>>>>>>> types into ArrowColumnVectors of corresponding column types by iterating
>>>>>>>>>>> over underlying ColumnIterator *n times*, where n is size of
>>>>>>>>>>> batch.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Sounds good to me. I'm not sure about extending vs wrapping
>>>>>>>>>>> because I'm not too familiar with the Arrow APIs.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - Reader.newParquetIterable()  maps primitive column types to
>>>>>>>>>>> the newly added ArrowParquetValueReader but for other types (nested types,
>>>>>>>>>>> etc.) uses current *InternalRow* based ValueReaders
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Sounds good for primitives, but I would just leave the nested
>>>>>>>>>>> types un-implemented for now.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - Stitch the columns vectors together to create ColumnarBatch,
>>>>>>>>>>> (Since *SupportsScanColumnarBatch* mixin currently expects this
>>>>>>>>>>> ) .. *although* *I'm a bit lost on how the stitching of columns
>>>>>>>>>>> happens currently*? .. and how the ArrowColumnVectors could  be
>>>>>>>>>>> stitched alongside regular columns that don't have arrow based support ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I don't think that you can mix regular columns and Arrow
>>>>>>>>>>> columns. It has to be all one or the other. That's why it's easier to start
>>>>>>>>>>> with primitives, then add structs, then lists, and finally maps.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*>
>>>>>>>>>>> *so that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We will probably need two paths. One for columnar batches and
>>>>>>>>>>> one for row-based reads. That doesn't need to be done right away and what
>>>>>>>>>>> you already have in your working copy makes sense as a start.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Gautam.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>>>>>>>> [github.com]
>>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>
>>>>>>>>>>> Software Engineer
>>>>>>>>>>>
>>>>>>>>>>> Netflix
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Ryan Blue
>>>>>>>> Software Engineer
>>>>>>>> Netflix
>>>>>>>>
>>>>>>>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Gautam <ga...@gmail.com>.
Hey Samarth,
              Sorry bout the delay. I ran into some bottlenecks for which I
had to add more code to be able to run benchmarks. I'v checked in my latest
changes to my fork's *vectorized-read* branch [0].

Here's the early numbers on the initial implementation...

*Benchmark Data:*
- 10 files
- 9MB each
- 1Millon rows (1 RowGroup)

Ran benchmark using the jmh benchmark tool within
incubator-iceberg/spark/src/jmh
using batch different sizes and compared it to  spark's vectorization and
non-vectorized reader.

*Command: *
./gradlew clean   :iceberg-spark:jmh
 -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark
-PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt



*Benchmark
           Mode  Cnt   Score   Error  Units*
IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
           ss    5  16.172 ± 0.750   s/op
IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
            ss    5   6.430 ± 0.136   s/op
IcebergSourceFlatParquetDataReadBenchmark.readIceberg
           ss    5  15.287 ± 0.212   s/op



*IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized100k
             ss    5  18.310 ± 0.498
s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized10k
                ss    5  18.020 ± 0.378
s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k
               ss    5  17.769 ± 0.412
s/op*IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
   ss    5   2.794 ± 0.141   s/op
IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceVectorized
      ss    5   1.063 ± 0.140   s/op
IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
           ss    5   2.966 ± 0.133   s/op


*IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized100k
     ss    5   2.015 ± 0.261
s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized10k
      ss    5   1.972 ± 0.105
s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized5k
       ss    5   2.065 ± 0.079   s/op*



So seems like there's no improvement  that vectorization is adding over the
non-vectorized reading. I'm currently trying to profile where the time is
being spent.

*Here is my initial speculation of why this is slow:*
 - There's too much overhead that seems to be from creating the batches.
i'm creating new instance of ColumnarBatch on each read  [1] . This should
prolly be re-used.
 - Although I am reusing the *FieldVector* across batched reads [2] I wrap
them in new *ArrowColumnVector*s [3]  on each read call. I didn't think
this would be a big deal but maybe it is.
 - The filters are not being applied in columnar fashion they are being
applied row by row as in Iceberg each filter visitor is stateless and
applied separately on each row's column.
 - I'm trying to re-use the BufferAllocator that Arrow provides [4] ..
Dunno if there are other strategies to using this. Will look more into this.
 - I'm batching until the rowgroup ends and restricting the last batch to
the Rowgroup boundary. I should prolly spill over to the next rowgroup to
fill that batch. Dunno if this would help as from what i can tell I don't
think *VectorizedParquetRecordReader *does this.

I'l try and provide more insights once i improve my code. But if there's
other insights folks have on where we can improve on things, i'd gladly try
them.

Cheers,
- Gautam.

[0] - https://github.com/prodeezy/incubator-iceberg/tree/vectorized-read
[1] -
https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L655
[2] -
https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java#L108
[3] -
https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L651
[4] -
https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java#L92


On Tue, Jul 30, 2019 at 5:13 PM Samarth Jain <sa...@gmail.com> wrote:

> Hey Gautam,
>
> Wanted to check back with you and see if you had any success running the
> benchmark and if you have any numbers to share.
>
>
>
> On Fri, Jul 26, 2019 at 4:34 PM Gautam <ga...@gmail.com> wrote:
>
>> Got it. Commented out that module and it works. Was just curious why it
>> doesn't work on master branch either.
>>
>> On Fri, Jul 26, 2019 at 3:49 PM Daniel Weeks <dw...@netflix.com> wrote:
>>
>>> Actually, it looks like the issue is right there in the error . . . the
>>> ErrorProne module is being excluded from the compile stages of the
>>> sub-projects here:
>>> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L152
>>>
>>> However, it is still being applied to the jmh tasks.  I'm not familiar
>>> with this module, but you can run the benchmarks by commenting it out here:
>>> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L167
>>>
>>> We'll need to fix the build to disable for the jmh tasks.
>>>
>>> -Dan
>>>
>>> On Fri, Jul 26, 2019 at 3:34 PM Daniel Weeks <dw...@netflix.com> wrote:
>>>
>>>> Gautam, you need to have the jmh-core libraries available to run.  I
>>>> validated that PR, so I'm guessing I had it configured in my environment.
>>>>
>>>> I assume there's a way to make that available within gradle, so I'll
>>>> take a look.
>>>>
>>>> On Fri, Jul 26, 2019 at 2:52 PM Gautam <ga...@gmail.com> wrote:
>>>>
>>>>> This fails on master too btw. Just wondering if i'm doing
>>>>> something wrong trying to run this.
>>>>>
>>>>> On Fri, Jul 26, 2019 at 2:24 PM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I'v been trying to run the jmh benchmarks bundled within the project.
>>>>>> I'v been running into issues with that .. have other hit this? Am I running
>>>>>> these incorrectly?
>>>>>>
>>>>>>
>>>>>> bash-3.2$ ./gradlew :iceberg-spark:jmh
>>>>>> -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
>>>>>> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
>>>>>> ..
>>>>>> ...
>>>>>> > Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
>>>>>> error: plug-in not found: ErrorProne
>>>>>>
>>>>>> FAILURE: Build failed with an exception.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Is there a config/plugin I need to add to build.gradle?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>
>>>>>>> Thanks Gautam!
>>>>>>>
>>>>>>> We'll start taking a look at your code. What do you think about
>>>>>>> creating a branch in the Iceberg repository where we can work on improving
>>>>>>> it together, before merging it into master?
>>>>>>>
>>>>>>> Also, you mentioned performance comparisons. Do you have any early
>>>>>>> results to share?
>>>>>>>
>>>>>>> rb
>>>>>>>
>>>>>>> On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Folks,
>>>>>>>>
>>>>>>>> I have checked in a WIP branch [1] with a working version of
>>>>>>>> Vectorized reads for Iceberg reader. Here's the diff  [2].
>>>>>>>>
>>>>>>>> *Implementation Notes:*
>>>>>>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to
>>>>>>>> instruct the DataSourceV2ScanExec to use `planBatchPartitions()` instead of
>>>>>>>> the usual `planInputPartitions()`. It returns instances of `ColumnarBatch`
>>>>>>>> on each iteration.
>>>>>>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion.
>>>>>>>> This was copied from [3] . Added by @Daniel Weeks
>>>>>>>> <dw...@netflix.com> . Thanks for that!
>>>>>>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders
>>>>>>>> used for reading/decoding the Parquet rowgroups (aka pagestores as referred
>>>>>>>> to in the code)
>>>>>>>>  - `VectorizedSparkParquetReaders` contains the visitor
>>>>>>>> implementations to map Parquet types to appropriate value readers. I
>>>>>>>> implemented the struct visitor so that the root schema can be mapped
>>>>>>>> properly. This has the added benefit of vectorization support for structs,
>>>>>>>> so yay!
>>>>>>>>  - For the initial version the value readers read an entire row
>>>>>>>> group into a single Arrow Field Vector. this i'd imagine will require
>>>>>>>> tuning for right batch sizing but i'v gone with one batch per rowgroup for
>>>>>>>> now.
>>>>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which
>>>>>>>> is Spark's ColumnVector implementation backed by Arrow. This is the first
>>>>>>>> contact point between Spark and Arrow interfaces.
>>>>>>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch`
>>>>>>>> by `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>>>>>>> which maps Structs to Columnar Batches. This allows us to have nested
>>>>>>>> structs where each level of nesting would be a nested columnar batch. Lemme
>>>>>>>> know what you think of this approach.
>>>>>>>>  - I'v added value readers for all supported primitive types listed
>>>>>>>> in `AvroDataTest`. There's a corresponding test for vectorized reader under
>>>>>>>> `TestSparkParquetVectorizedReader`
>>>>>>>>  - I haven't fixed all the Checkstyle errors so you will have to
>>>>>>>> turn checkstyle off in build.gradle. Also skip tests while building..
>>>>>>>> sorry! :-(
>>>>>>>>
>>>>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore
>>>>>>>> this as it's not used. This was from my previous impl of Vectorization. I'v
>>>>>>>> kept it around to compare performance.
>>>>>>>>
>>>>>>>> Lemme know what folks think of the approach. I'm getting this
>>>>>>>> working for our scale test benchmark and will report back with numbers.
>>>>>>>> Feel free to run your own benchmarks and share.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> -Gautam.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> [1] -
>>>>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>>>>> [2] -
>>>>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>>>>> [3] -
>>>>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Will do. Doing a bit of housekeeping on the code and also adding
>>>>>>>>> more primitive type support.
>>>>>>>>>
>>>>>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Would it be possible to put the work in progress code in open
>>>>>>>>>> source?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *From: *Gautam <ga...@gmail.com>
>>>>>>>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>>>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>>>>>> *To: *Daniel Weeks <dw...@netflix.com>
>>>>>>>>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>>>>>>>>> dev@iceberg.apache.org>
>>>>>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> That would be great!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hey Gautam,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> We also have a couple people looking into vectorized reading
>>>>>>>>>> (into Arrow memory).  I think it would be good for us to get together and
>>>>>>>>>> see if we can collaborate on a common approach for this.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I'll reach out directly and see if we can get together.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Dan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Figured this out. I'm returning ColumnarBatch iterator directly
>>>>>>>>>> without projection with schema set appropriately in `readSchema() `.. the
>>>>>>>>>> empty result was due to valuesRead not being set correctly on FileIterator.
>>>>>>>>>> Did that and things are working. Will circle back with numbers soon.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hey Guys,
>>>>>>>>>>
>>>>>>>>>>            Sorry bout the delay on this. Just got back on getting
>>>>>>>>>> a basic working implementation in Iceberg for Vectorization on primitive
>>>>>>>>>> types.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Here's what I have so far :  *
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I have added `ParquetValueReader` implementations for some basic
>>>>>>>>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>>>>>>>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>>>>>>>>> value vector reader there are column iterators that read from the parquet
>>>>>>>>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>>>>>>>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>>>>>>>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>>>>>>>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>>>>>>>>> work properly with the underlying interfaces.  I'v also made changes to
>>>>>>>>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>>>>>>>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>>>>>>>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>>>>>>>>> planning runtime works fine with these changes.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Although it fails during query execution, the bit it's  currently
>>>>>>>>>> failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>>>>>>>> [github.com]
>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This code, I think,  tries to apply the iterator's schema
>>>>>>>>>> projection on the InternalRow instances. This seems to be tightly coupled
>>>>>>>>>> to InternalRow as Spark's catalyst expressions have implemented the
>>>>>>>>>> UnsafeProjection for InternalRow only. If I take this out and just return
>>>>>>>>>> the `Iterator<ColumnarBatch>` iterator I built it returns empty result on
>>>>>>>>>> the client. I'm guessing this is coz Spark is unaware of the iterator's
>>>>>>>>>> schema? There's a Todo in the code that says "*remove the
>>>>>>>>>> projection by reporting the iterator's schema back to Spark*".
>>>>>>>>>> Is there a simple way to communicate that to Spark for my new iterator? Any
>>>>>>>>>> pointers on how to get around this?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks and Regards,
>>>>>>>>>>
>>>>>>>>>> -Gautam.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Replies inline.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks for responding Ryan,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I'd like to start with testing Arrow out with readers for
>>>>>>>>>> primitive type and incrementally add in Struct/Array support, also
>>>>>>>>>> ArrowWriter [1] currently doesn't have converters for map type. How can I
>>>>>>>>>> default these types to regular materialization whilst supporting Arrow
>>>>>>>>>> based support for primitives?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> We should look at what Spark does to handle maps.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I think we should get the prototype working with test cases that
>>>>>>>>>> don't have maps, structs, or lists. Just getting primitives working is a
>>>>>>>>>> good start and just won't hit these problems.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Lemme know if this makes sense...
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive
>>>>>>>>>> types into ArrowColumnVectors of corresponding column types by iterating
>>>>>>>>>> over underlying ColumnIterator *n times*, where n is size of
>>>>>>>>>> batch.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Sounds good to me. I'm not sure about extending vs wrapping
>>>>>>>>>> because I'm not too familiar with the Arrow APIs.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> - Reader.newParquetIterable()  maps primitive column types to the
>>>>>>>>>> newly added ArrowParquetValueReader but for other types (nested types,
>>>>>>>>>> etc.) uses current *InternalRow* based ValueReaders
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Sounds good for primitives, but I would just leave the nested
>>>>>>>>>> types un-implemented for now.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> - Stitch the columns vectors together to create ColumnarBatch,
>>>>>>>>>> (Since *SupportsScanColumnarBatch* mixin currently expects this
>>>>>>>>>> ) .. *although* *I'm a bit lost on how the stitching of columns
>>>>>>>>>> happens currently*? .. and how the ArrowColumnVectors could  be
>>>>>>>>>> stitched alongside regular columns that don't have arrow based support ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I don't think that you can mix regular columns and Arrow columns.
>>>>>>>>>> It has to be all one or the other. That's why it's easier to start with
>>>>>>>>>> primitives, then add structs, then lists, and finally maps.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>>>>>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> We will probably need two paths. One for columnar batches and one
>>>>>>>>>> for row-based reads. That doesn't need to be done right away and what you
>>>>>>>>>> already have in your working copy makes sense as a start.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Gautam.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>>>>>>> [github.com]
>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Ryan Blue
>>>>>>>>>>
>>>>>>>>>> Software Engineer
>>>>>>>>>>
>>>>>>>>>> Netflix
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Samarth Jain <sa...@gmail.com>.
Hey Gautam,

Wanted to check back with you and see if you had any success running the
benchmark and if you have any numbers to share.



On Fri, Jul 26, 2019 at 4:34 PM Gautam <ga...@gmail.com> wrote:

> Got it. Commented out that module and it works. Was just curious why it
> doesn't work on master branch either.
>
> On Fri, Jul 26, 2019 at 3:49 PM Daniel Weeks <dw...@netflix.com> wrote:
>
>> Actually, it looks like the issue is right there in the error . . . the
>> ErrorProne module is being excluded from the compile stages of the
>> sub-projects here:
>> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L152
>>
>> However, it is still being applied to the jmh tasks.  I'm not familiar
>> with this module, but you can run the benchmarks by commenting it out here:
>> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L167
>>
>> We'll need to fix the build to disable for the jmh tasks.
>>
>> -Dan
>>
>> On Fri, Jul 26, 2019 at 3:34 PM Daniel Weeks <dw...@netflix.com> wrote:
>>
>>> Gautam, you need to have the jmh-core libraries available to run.  I
>>> validated that PR, so I'm guessing I had it configured in my environment.
>>>
>>> I assume there's a way to make that available within gradle, so I'll
>>> take a look.
>>>
>>> On Fri, Jul 26, 2019 at 2:52 PM Gautam <ga...@gmail.com> wrote:
>>>
>>>> This fails on master too btw. Just wondering if i'm doing
>>>> something wrong trying to run this.
>>>>
>>>> On Fri, Jul 26, 2019 at 2:24 PM Gautam <ga...@gmail.com> wrote:
>>>>
>>>>> I'v been trying to run the jmh benchmarks bundled within the project.
>>>>> I'v been running into issues with that .. have other hit this? Am I running
>>>>> these incorrectly?
>>>>>
>>>>>
>>>>> bash-3.2$ ./gradlew :iceberg-spark:jmh
>>>>> -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
>>>>> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
>>>>> ..
>>>>> ...
>>>>> > Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
>>>>> error: plug-in not found: ErrorProne
>>>>>
>>>>> FAILURE: Build failed with an exception.
>>>>>
>>>>>
>>>>>
>>>>> Is there a config/plugin I need to add to build.gradle?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>>> Thanks Gautam!
>>>>>>
>>>>>> We'll start taking a look at your code. What do you think about
>>>>>> creating a branch in the Iceberg repository where we can work on improving
>>>>>> it together, before merging it into master?
>>>>>>
>>>>>> Also, you mentioned performance comparisons. Do you have any early
>>>>>> results to share?
>>>>>>
>>>>>> rb
>>>>>>
>>>>>> On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Folks,
>>>>>>>
>>>>>>> I have checked in a WIP branch [1] with a working version of
>>>>>>> Vectorized reads for Iceberg reader. Here's the diff  [2].
>>>>>>>
>>>>>>> *Implementation Notes:*
>>>>>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to
>>>>>>> instruct the DataSourceV2ScanExec to use `planBatchPartitions()` instead of
>>>>>>> the usual `planInputPartitions()`. It returns instances of `ColumnarBatch`
>>>>>>> on each iteration.
>>>>>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This
>>>>>>> was copied from [3] . Added by @Daniel Weeks <dw...@netflix.com> .
>>>>>>> Thanks for that!
>>>>>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used
>>>>>>> for reading/decoding the Parquet rowgroups (aka pagestores as referred to
>>>>>>> in the code)
>>>>>>>  - `VectorizedSparkParquetReaders` contains the visitor
>>>>>>> implementations to map Parquet types to appropriate value readers. I
>>>>>>> implemented the struct visitor so that the root schema can be mapped
>>>>>>> properly. This has the added benefit of vectorization support for structs,
>>>>>>> so yay!
>>>>>>>  - For the initial version the value readers read an entire row
>>>>>>> group into a single Arrow Field Vector. this i'd imagine will require
>>>>>>> tuning for right batch sizing but i'v gone with one batch per rowgroup for
>>>>>>> now.
>>>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which
>>>>>>> is Spark's ColumnVector implementation backed by Arrow. This is the first
>>>>>>> contact point between Spark and Arrow interfaces.
>>>>>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch`
>>>>>>> by `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>>>>>> which maps Structs to Columnar Batches. This allows us to have nested
>>>>>>> structs where each level of nesting would be a nested columnar batch. Lemme
>>>>>>> know what you think of this approach.
>>>>>>>  - I'v added value readers for all supported primitive types listed
>>>>>>> in `AvroDataTest`. There's a corresponding test for vectorized reader under
>>>>>>> `TestSparkParquetVectorizedReader`
>>>>>>>  - I haven't fixed all the Checkstyle errors so you will have to
>>>>>>> turn checkstyle off in build.gradle. Also skip tests while building..
>>>>>>> sorry! :-(
>>>>>>>
>>>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore this
>>>>>>> as it's not used. This was from my previous impl of Vectorization. I'v kept
>>>>>>> it around to compare performance.
>>>>>>>
>>>>>>> Lemme know what folks think of the approach. I'm getting this
>>>>>>> working for our scale test benchmark and will report back with numbers.
>>>>>>> Feel free to run your own benchmarks and share.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> -Gautam.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> [1] -
>>>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>>>> [2] -
>>>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>>>> [3] -
>>>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Will do. Doing a bit of housekeeping on the code and also adding
>>>>>>>> more primitive type support.
>>>>>>>>
>>>>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Would it be possible to put the work in progress code in open
>>>>>>>>> source?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *From: *Gautam <ga...@gmail.com>
>>>>>>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>>>>> *To: *Daniel Weeks <dw...@netflix.com>
>>>>>>>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>>>>>>>> dev@iceberg.apache.org>
>>>>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> That would be great!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hey Gautam,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We also have a couple people looking into vectorized reading (into
>>>>>>>>> Arrow memory).  I think it would be good for us to get together and see if
>>>>>>>>> we can collaborate on a common approach for this.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I'll reach out directly and see if we can get together.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Dan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Figured this out. I'm returning ColumnarBatch iterator directly
>>>>>>>>> without projection with schema set appropriately in `readSchema() `.. the
>>>>>>>>> empty result was due to valuesRead not being set correctly on FileIterator.
>>>>>>>>> Did that and things are working. Will circle back with numbers soon.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hey Guys,
>>>>>>>>>
>>>>>>>>>            Sorry bout the delay on this. Just got back on getting
>>>>>>>>> a basic working implementation in Iceberg for Vectorization on primitive
>>>>>>>>> types.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Here's what I have so far :  *
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I have added `ParquetValueReader` implementations for some basic
>>>>>>>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>>>>>>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>>>>>>>> value vector reader there are column iterators that read from the parquet
>>>>>>>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>>>>>>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>>>>>>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>>>>>>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>>>>>>>> work properly with the underlying interfaces.  I'v also made changes to
>>>>>>>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>>>>>>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>>>>>>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>>>>>>>> planning runtime works fine with these changes.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Although it fails during query execution, the bit it's  currently
>>>>>>>>> failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>>>>>>> [github.com]
>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This code, I think,  tries to apply the iterator's schema
>>>>>>>>> projection on the InternalRow instances. This seems to be tightly coupled
>>>>>>>>> to InternalRow as Spark's catalyst expressions have implemented the
>>>>>>>>> UnsafeProjection for InternalRow only. If I take this out and just return
>>>>>>>>> the `Iterator<ColumnarBatch>` iterator I built it returns empty result on
>>>>>>>>> the client. I'm guessing this is coz Spark is unaware of the iterator's
>>>>>>>>> schema? There's a Todo in the code that says "*remove the
>>>>>>>>> projection by reporting the iterator's schema back to Spark*".
>>>>>>>>> Is there a simple way to communicate that to Spark for my new iterator? Any
>>>>>>>>> pointers on how to get around this?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks and Regards,
>>>>>>>>>
>>>>>>>>> -Gautam.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Replies inline.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Thanks for responding Ryan,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I'd like to start with testing Arrow out with readers for
>>>>>>>>> primitive type and incrementally add in Struct/Array support, also
>>>>>>>>> ArrowWriter [1] currently doesn't have converters for map type. How can I
>>>>>>>>> default these types to regular materialization whilst supporting Arrow
>>>>>>>>> based support for primitives?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We should look at what Spark does to handle maps.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I think we should get the prototype working with test cases that
>>>>>>>>> don't have maps, structs, or lists. Just getting primitives working is a
>>>>>>>>> good start and just won't hit these problems.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Lemme know if this makes sense...
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types
>>>>>>>>> into ArrowColumnVectors of corresponding column types by iterating over
>>>>>>>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Sounds good to me. I'm not sure about extending vs wrapping
>>>>>>>>> because I'm not too familiar with the Arrow APIs.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> - Reader.newParquetIterable()  maps primitive column types to the
>>>>>>>>> newly added ArrowParquetValueReader but for other types (nested types,
>>>>>>>>> etc.) uses current *InternalRow* based ValueReaders
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Sounds good for primitives, but I would just leave the nested
>>>>>>>>> types un-implemented for now.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> - Stitch the columns vectors together to create ColumnarBatch,
>>>>>>>>> (Since *SupportsScanColumnarBatch* mixin currently expects this )
>>>>>>>>> .. *although* *I'm a bit lost on how the stitching of columns
>>>>>>>>> happens currently*? .. and how the ArrowColumnVectors could  be
>>>>>>>>> stitched alongside regular columns that don't have arrow based support ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I don't think that you can mix regular columns and Arrow columns.
>>>>>>>>> It has to be all one or the other. That's why it's easier to start with
>>>>>>>>> primitives, then add structs, then lists, and finally maps.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>>>>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We will probably need two paths. One for columnar batches and one
>>>>>>>>> for row-based reads. That doesn't need to be done right away and what you
>>>>>>>>> already have in your working copy makes sense as a start.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Gautam.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>>>>>> [github.com]
>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Ryan Blue
>>>>>>>>>
>>>>>>>>> Software Engineer
>>>>>>>>>
>>>>>>>>> Netflix
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Gautam <ga...@gmail.com>.
Got it. Commented out that module and it works. Was just curious why it
doesn't work on master branch either.

On Fri, Jul 26, 2019 at 3:49 PM Daniel Weeks <dw...@netflix.com> wrote:

> Actually, it looks like the issue is right there in the error . . . the
> ErrorProne module is being excluded from the compile stages of the
> sub-projects here:
> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L152
>
> However, it is still being applied to the jmh tasks.  I'm not familiar
> with this module, but you can run the benchmarks by commenting it out here:
> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L167
>
> We'll need to fix the build to disable for the jmh tasks.
>
> -Dan
>
> On Fri, Jul 26, 2019 at 3:34 PM Daniel Weeks <dw...@netflix.com> wrote:
>
>> Gautam, you need to have the jmh-core libraries available to run.  I
>> validated that PR, so I'm guessing I had it configured in my environment.
>>
>> I assume there's a way to make that available within gradle, so I'll take
>> a look.
>>
>> On Fri, Jul 26, 2019 at 2:52 PM Gautam <ga...@gmail.com> wrote:
>>
>>> This fails on master too btw. Just wondering if i'm doing
>>> something wrong trying to run this.
>>>
>>> On Fri, Jul 26, 2019 at 2:24 PM Gautam <ga...@gmail.com> wrote:
>>>
>>>> I'v been trying to run the jmh benchmarks bundled within the project.
>>>> I'v been running into issues with that .. have other hit this? Am I running
>>>> these incorrectly?
>>>>
>>>>
>>>> bash-3.2$ ./gradlew :iceberg-spark:jmh
>>>> -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
>>>> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
>>>> ..
>>>> ...
>>>> > Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
>>>> error: plug-in not found: ErrorProne
>>>>
>>>> FAILURE: Build failed with an exception.
>>>>
>>>>
>>>>
>>>> Is there a config/plugin I need to add to build.gradle?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>>> Thanks Gautam!
>>>>>
>>>>> We'll start taking a look at your code. What do you think about
>>>>> creating a branch in the Iceberg repository where we can work on improving
>>>>> it together, before merging it into master?
>>>>>
>>>>> Also, you mentioned performance comparisons. Do you have any early
>>>>> results to share?
>>>>>
>>>>> rb
>>>>>
>>>>> On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello Folks,
>>>>>>
>>>>>> I have checked in a WIP branch [1] with a working version of
>>>>>> Vectorized reads for Iceberg reader. Here's the diff  [2].
>>>>>>
>>>>>> *Implementation Notes:*
>>>>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to
>>>>>> instruct the DataSourceV2ScanExec to use `planBatchPartitions()` instead of
>>>>>> the usual `planInputPartitions()`. It returns instances of `ColumnarBatch`
>>>>>> on each iteration.
>>>>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This
>>>>>> was copied from [3] . Added by @Daniel Weeks <dw...@netflix.com> .
>>>>>> Thanks for that!
>>>>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used
>>>>>> for reading/decoding the Parquet rowgroups (aka pagestores as referred to
>>>>>> in the code)
>>>>>>  - `VectorizedSparkParquetReaders` contains the visitor
>>>>>> implementations to map Parquet types to appropriate value readers. I
>>>>>> implemented the struct visitor so that the root schema can be mapped
>>>>>> properly. This has the added benefit of vectorization support for structs,
>>>>>> so yay!
>>>>>>  - For the initial version the value readers read an entire row group
>>>>>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>>>>>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>>>>>> Spark's ColumnVector implementation backed by Arrow. This is the first
>>>>>> contact point between Spark and Arrow interfaces.
>>>>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>>>>>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>>>>> which maps Structs to Columnar Batches. This allows us to have nested
>>>>>> structs where each level of nesting would be a nested columnar batch. Lemme
>>>>>> know what you think of this approach.
>>>>>>  - I'v added value readers for all supported primitive types listed
>>>>>> in `AvroDataTest`. There's a corresponding test for vectorized reader under
>>>>>> `TestSparkParquetVectorizedReader`
>>>>>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>>>>>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>>>>>
>>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore this
>>>>>> as it's not used. This was from my previous impl of Vectorization. I'v kept
>>>>>> it around to compare performance.
>>>>>>
>>>>>> Lemme know what folks think of the approach. I'm getting this working
>>>>>> for our scale test benchmark and will report back with numbers. Feel free
>>>>>> to run your own benchmarks and share.
>>>>>>
>>>>>> Cheers,
>>>>>> -Gautam.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1] -
>>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>>> [2] -
>>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>>> [3] -
>>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>>>
>>>>>>
>>>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Will do. Doing a bit of housekeeping on the code and also adding
>>>>>>> more primitive type support.
>>>>>>>
>>>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Would it be possible to put the work in progress code in open
>>>>>>>> source?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *From: *Gautam <ga...@gmail.com>
>>>>>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>>>> *To: *Daniel Weeks <dw...@netflix.com>
>>>>>>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>>>>>>> dev@iceberg.apache.org>
>>>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> That would be great!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hey Gautam,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> We also have a couple people looking into vectorized reading (into
>>>>>>>> Arrow memory).  I think it would be good for us to get together and see if
>>>>>>>> we can collaborate on a common approach for this.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I'll reach out directly and see if we can get together.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -Dan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Figured this out. I'm returning ColumnarBatch iterator directly
>>>>>>>> without projection with schema set appropriately in `readSchema() `.. the
>>>>>>>> empty result was due to valuesRead not being set correctly on FileIterator.
>>>>>>>> Did that and things are working. Will circle back with numbers soon.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hey Guys,
>>>>>>>>
>>>>>>>>            Sorry bout the delay on this. Just got back on getting a
>>>>>>>> basic working implementation in Iceberg for Vectorization on primitive
>>>>>>>> types.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Here's what I have so far :  *
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I have added `ParquetValueReader` implementations for some basic
>>>>>>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>>>>>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>>>>>>> value vector reader there are column iterators that read from the parquet
>>>>>>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>>>>>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>>>>>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>>>>>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>>>>>>> work properly with the underlying interfaces.  I'v also made changes to
>>>>>>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>>>>>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>>>>>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>>>>>>> planning runtime works fine with these changes.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Although it fails during query execution, the bit it's  currently
>>>>>>>> failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>>>>>> [github.com]
>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> This code, I think,  tries to apply the iterator's schema
>>>>>>>> projection on the InternalRow instances. This seems to be tightly coupled
>>>>>>>> to InternalRow as Spark's catalyst expressions have implemented the
>>>>>>>> UnsafeProjection for InternalRow only. If I take this out and just return
>>>>>>>> the `Iterator<ColumnarBatch>` iterator I built it returns empty result on
>>>>>>>> the client. I'm guessing this is coz Spark is unaware of the iterator's
>>>>>>>> schema? There's a Todo in the code that says "*remove the
>>>>>>>> projection by reporting the iterator's schema back to Spark*".  Is
>>>>>>>> there a simple way to communicate that to Spark for my new iterator? Any
>>>>>>>> pointers on how to get around this?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks and Regards,
>>>>>>>>
>>>>>>>> -Gautam.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Replies inline.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Thanks for responding Ryan,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I'd like to start with testing Arrow out with readers for primitive
>>>>>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1]
>>>>>>>> currently doesn't have converters for map type. How can I default these
>>>>>>>> types to regular materialization whilst supporting Arrow based support for
>>>>>>>> primitives?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> We should look at what Spark does to handle maps.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I think we should get the prototype working with test cases that
>>>>>>>> don't have maps, structs, or lists. Just getting primitives working is a
>>>>>>>> good start and just won't hit these problems.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Lemme know if this makes sense...
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types
>>>>>>>> into ArrowColumnVectors of corresponding column types by iterating over
>>>>>>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Sounds good to me. I'm not sure about extending vs wrapping because
>>>>>>>> I'm not too familiar with the Arrow APIs.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> - Reader.newParquetIterable()  maps primitive column types to the
>>>>>>>> newly added ArrowParquetValueReader but for other types (nested types,
>>>>>>>> etc.) uses current *InternalRow* based ValueReaders
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Sounds good for primitives, but I would just leave the nested types
>>>>>>>> un-implemented for now.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> - Stitch the columns vectors together to create ColumnarBatch,
>>>>>>>> (Since *SupportsScanColumnarBatch* mixin currently expects this )
>>>>>>>> .. *although* *I'm a bit lost on how the stitching of columns
>>>>>>>> happens currently*? .. and how the ArrowColumnVectors could  be
>>>>>>>> stitched alongside regular columns that don't have arrow based support ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I don't think that you can mix regular columns and Arrow columns.
>>>>>>>> It has to be all one or the other. That's why it's easier to start with
>>>>>>>> primitives, then add structs, then lists, and finally maps.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>>>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> We will probably need two paths. One for columnar batches and one
>>>>>>>> for row-based reads. That doesn't need to be done right away and what you
>>>>>>>> already have in your working copy makes sense as a start.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -Gautam.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>>>>> [github.com]
>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Ryan Blue
>>>>>>>>
>>>>>>>> Software Engineer
>>>>>>>>
>>>>>>>> Netflix
>>>>>>>>
>>>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Daniel Weeks <dw...@netflix.com.INVALID>.
Actually, it looks like the issue is right there in the error . . . the
ErrorProne module is being excluded from the compile stages of the
sub-projects here:
https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L152

However, it is still being applied to the jmh tasks.  I'm not familiar with
this module, but you can run the benchmarks by commenting it out here:
https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L167

We'll need to fix the build to disable for the jmh tasks.

-Dan

On Fri, Jul 26, 2019 at 3:34 PM Daniel Weeks <dw...@netflix.com> wrote:

> Gautam, you need to have the jmh-core libraries available to run.  I
> validated that PR, so I'm guessing I had it configured in my environment.
>
> I assume there's a way to make that available within gradle, so I'll take
> a look.
>
> On Fri, Jul 26, 2019 at 2:52 PM Gautam <ga...@gmail.com> wrote:
>
>> This fails on master too btw. Just wondering if i'm doing something wrong
>> trying to run this.
>>
>> On Fri, Jul 26, 2019 at 2:24 PM Gautam <ga...@gmail.com> wrote:
>>
>>> I'v been trying to run the jmh benchmarks bundled within the project.
>>> I'v been running into issues with that .. have other hit this? Am I running
>>> these incorrectly?
>>>
>>>
>>> bash-3.2$ ./gradlew :iceberg-spark:jmh
>>> -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
>>> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
>>> ..
>>> ...
>>> > Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
>>> error: plug-in not found: ErrorProne
>>>
>>> FAILURE: Build failed with an exception.
>>>
>>>
>>>
>>> Is there a config/plugin I need to add to build.gradle?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> Thanks Gautam!
>>>>
>>>> We'll start taking a look at your code. What do you think about
>>>> creating a branch in the Iceberg repository where we can work on improving
>>>> it together, before merging it into master?
>>>>
>>>> Also, you mentioned performance comparisons. Do you have any early
>>>> results to share?
>>>>
>>>> rb
>>>>
>>>> On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com> wrote:
>>>>
>>>>> Hello Folks,
>>>>>
>>>>> I have checked in a WIP branch [1] with a working version of
>>>>> Vectorized reads for Iceberg reader. Here's the diff  [2].
>>>>>
>>>>> *Implementation Notes:*
>>>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to
>>>>> instruct the DataSourceV2ScanExec to use `planBatchPartitions()` instead of
>>>>> the usual `planInputPartitions()`. It returns instances of `ColumnarBatch`
>>>>> on each iteration.
>>>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This
>>>>> was copied from [3] . Added by @Daniel Weeks <dw...@netflix.com> .
>>>>> Thanks for that!
>>>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used
>>>>> for reading/decoding the Parquet rowgroups (aka pagestores as referred to
>>>>> in the code)
>>>>>  - `VectorizedSparkParquetReaders` contains the visitor
>>>>> implementations to map Parquet types to appropriate value readers. I
>>>>> implemented the struct visitor so that the root schema can be mapped
>>>>> properly. This has the added benefit of vectorization support for structs,
>>>>> so yay!
>>>>>  - For the initial version the value readers read an entire row group
>>>>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>>>>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>>>>> Spark's ColumnVector implementation backed by Arrow. This is the first
>>>>> contact point between Spark and Arrow interfaces.
>>>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>>>>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>>>> which maps Structs to Columnar Batches. This allows us to have nested
>>>>> structs where each level of nesting would be a nested columnar batch. Lemme
>>>>> know what you think of this approach.
>>>>>  - I'v added value readers for all supported primitive types listed in
>>>>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>>>>> `TestSparkParquetVectorizedReader`
>>>>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>>>>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>>>>
>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore this
>>>>> as it's not used. This was from my previous impl of Vectorization. I'v kept
>>>>> it around to compare performance.
>>>>>
>>>>> Lemme know what folks think of the approach. I'm getting this working
>>>>> for our scale test benchmark and will report back with numbers. Feel free
>>>>> to run your own benchmarks and share.
>>>>>
>>>>> Cheers,
>>>>> -Gautam.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> [1] -
>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>> [2] -
>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>> [3] -
>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>>
>>>>>
>>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>>>>> primitive type support.
>>>>>>
>>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Would it be possible to put the work in progress code in open source?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From: *Gautam <ga...@gmail.com>
>>>>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>>> *To: *Daniel Weeks <dw...@netflix.com>
>>>>>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>>>>>> dev@iceberg.apache.org>
>>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> That would be great!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hey Gautam,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> We also have a couple people looking into vectorized reading (into
>>>>>>> Arrow memory).  I think it would be good for us to get together and see if
>>>>>>> we can collaborate on a common approach for this.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I'll reach out directly and see if we can get together.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -Dan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Figured this out. I'm returning ColumnarBatch iterator directly
>>>>>>> without projection with schema set appropriately in `readSchema() `.. the
>>>>>>> empty result was due to valuesRead not being set correctly on FileIterator.
>>>>>>> Did that and things are working. Will circle back with numbers soon.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hey Guys,
>>>>>>>
>>>>>>>            Sorry bout the delay on this. Just got back on getting a
>>>>>>> basic working implementation in Iceberg for Vectorization on primitive
>>>>>>> types.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Here's what I have so far :  *
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I have added `ParquetValueReader` implementations for some basic
>>>>>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>>>>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>>>>>> value vector reader there are column iterators that read from the parquet
>>>>>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>>>>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>>>>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>>>>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>>>>>> work properly with the underlying interfaces.  I'v also made changes to
>>>>>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>>>>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>>>>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>>>>>> planning runtime works fine with these changes.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Although it fails during query execution, the bit it's  currently
>>>>>>> failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>>>>> [github.com]
>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> This code, I think,  tries to apply the iterator's schema projection
>>>>>>> on the InternalRow instances. This seems to be tightly coupled to
>>>>>>> InternalRow as Spark's catalyst expressions have implemented the
>>>>>>> UnsafeProjection for InternalRow only. If I take this out and just return
>>>>>>> the `Iterator<ColumnarBatch>` iterator I built it returns empty result on
>>>>>>> the client. I'm guessing this is coz Spark is unaware of the iterator's
>>>>>>> schema? There's a Todo in the code that says "*remove the
>>>>>>> projection by reporting the iterator's schema back to Spark*".  Is
>>>>>>> there a simple way to communicate that to Spark for my new iterator? Any
>>>>>>> pointers on how to get around this?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks and Regards,
>>>>>>>
>>>>>>> -Gautam.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>>
>>>>>>> Replies inline.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Thanks for responding Ryan,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I'd like to start with testing Arrow out with readers for primitive
>>>>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1]
>>>>>>> currently doesn't have converters for map type. How can I default these
>>>>>>> types to regular materialization whilst supporting Arrow based support for
>>>>>>> primitives?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> We should look at what Spark does to handle maps.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I think we should get the prototype working with test cases that
>>>>>>> don't have maps, structs, or lists. Just getting primitives working is a
>>>>>>> good start and just won't hit these problems.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Lemme know if this makes sense...
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types
>>>>>>> into ArrowColumnVectors of corresponding column types by iterating over
>>>>>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Sounds good to me. I'm not sure about extending vs wrapping because
>>>>>>> I'm not too familiar with the Arrow APIs.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> - Reader.newParquetIterable()  maps primitive column types to the
>>>>>>> newly added ArrowParquetValueReader but for other types (nested types,
>>>>>>> etc.) uses current *InternalRow* based ValueReaders
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Sounds good for primitives, but I would just leave the nested types
>>>>>>> un-implemented for now.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> - Stitch the columns vectors together to create ColumnarBatch,
>>>>>>> (Since *SupportsScanColumnarBatch* mixin currently expects this )
>>>>>>> .. *although* *I'm a bit lost on how the stitching of columns
>>>>>>> happens currently*? .. and how the ArrowColumnVectors could  be
>>>>>>> stitched alongside regular columns that don't have arrow based support ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I don't think that you can mix regular columns and Arrow columns. It
>>>>>>> has to be all one or the other. That's why it's easier to start with
>>>>>>> primitives, then add structs, then lists, and finally maps.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> We will probably need two paths. One for columnar batches and one
>>>>>>> for row-based reads. That doesn't need to be done right away and what you
>>>>>>> already have in your working copy makes sense as a start.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -Gautam.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>>>> [github.com]
>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Ryan Blue
>>>>>>>
>>>>>>> Software Engineer
>>>>>>>
>>>>>>> Netflix
>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Daniel Weeks <dw...@netflix.com.INVALID>.
Gautam, you need to have the jmh-core libraries available to run.  I
validated that PR, so I'm guessing I had it configured in my environment.

I assume there's a way to make that available within gradle, so I'll take a
look.

On Fri, Jul 26, 2019 at 2:52 PM Gautam <ga...@gmail.com> wrote:

> This fails on master too btw. Just wondering if i'm doing something wrong
> trying to run this.
>
> On Fri, Jul 26, 2019 at 2:24 PM Gautam <ga...@gmail.com> wrote:
>
>> I'v been trying to run the jmh benchmarks bundled within the project. I'v
>> been running into issues with that .. have other hit this? Am I running
>> these incorrectly?
>>
>>
>> bash-3.2$ ./gradlew :iceberg-spark:jmh
>> -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
>> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
>> ..
>> ...
>> > Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
>> error: plug-in not found: ErrorProne
>>
>> FAILURE: Build failed with an exception.
>>
>>
>>
>> Is there a config/plugin I need to add to build.gradle?
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Thanks Gautam!
>>>
>>> We'll start taking a look at your code. What do you think about creating
>>> a branch in the Iceberg repository where we can work on improving it
>>> together, before merging it into master?
>>>
>>> Also, you mentioned performance comparisons. Do you have any early
>>> results to share?
>>>
>>> rb
>>>
>>> On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com> wrote:
>>>
>>>> Hello Folks,
>>>>
>>>> I have checked in a WIP branch [1] with a working version of Vectorized
>>>> reads for Iceberg reader. Here's the diff  [2].
>>>>
>>>> *Implementation Notes:*
>>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to
>>>> instruct the DataSourceV2ScanExec to use `planBatchPartitions()` instead of
>>>> the usual `planInputPartitions()`. It returns instances of `ColumnarBatch`
>>>> on each iteration.
>>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This
>>>> was copied from [3] . Added by @Daniel Weeks <dw...@netflix.com> .
>>>> Thanks for that!
>>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used
>>>> for reading/decoding the Parquet rowgroups (aka pagestores as referred to
>>>> in the code)
>>>>  - `VectorizedSparkParquetReaders` contains the visitor implementations
>>>> to map Parquet types to appropriate value readers. I implemented the struct
>>>> visitor so that the root schema can be mapped properly. This has the added
>>>> benefit of vectorization support for structs, so yay!
>>>>  - For the initial version the value readers read an entire row group
>>>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>>>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>>>> Spark's ColumnVector implementation backed by Arrow. This is the first
>>>> contact point between Spark and Arrow interfaces.
>>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>>>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>>> which maps Structs to Columnar Batches. This allows us to have nested
>>>> structs where each level of nesting would be a nested columnar batch. Lemme
>>>> know what you think of this approach.
>>>>  - I'v added value readers for all supported primitive types listed in
>>>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>>>> `TestSparkParquetVectorizedReader`
>>>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>>>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>>>
>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
>>>> it's not used. This was from my previous impl of Vectorization. I'v kept it
>>>> around to compare performance.
>>>>
>>>> Lemme know what folks think of the approach. I'm getting this working
>>>> for our scale test benchmark and will report back with numbers. Feel free
>>>> to run your own benchmarks and share.
>>>>
>>>> Cheers,
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>>
>>>> [1] -
>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>> [2] -
>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>> [3] -
>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>
>>>>
>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com> wrote:
>>>>
>>>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>>>> primitive type support.
>>>>>
>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com>
>>>>> wrote:
>>>>>
>>>>>> Would it be possible to put the work in progress code in open source?
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *Gautam <ga...@gmail.com>
>>>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>> *To: *Daniel Weeks <dw...@netflix.com>
>>>>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>>>>> dev@iceberg.apache.org>
>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>
>>>>>>
>>>>>>
>>>>>> That would be great!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hey Gautam,
>>>>>>
>>>>>>
>>>>>>
>>>>>> We also have a couple people looking into vectorized reading (into
>>>>>> Arrow memory).  I think it would be good for us to get together and see if
>>>>>> we can collaborate on a common approach for this.
>>>>>>
>>>>>>
>>>>>>
>>>>>> I'll reach out directly and see if we can get together.
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Dan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Figured this out. I'm returning ColumnarBatch iterator directly
>>>>>> without projection with schema set appropriately in `readSchema() `.. the
>>>>>> empty result was due to valuesRead not being set correctly on FileIterator.
>>>>>> Did that and things are working. Will circle back with numbers soon.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hey Guys,
>>>>>>
>>>>>>            Sorry bout the delay on this. Just got back on getting a
>>>>>> basic working implementation in Iceberg for Vectorization on primitive
>>>>>> types.
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Here's what I have so far :  *
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have added `ParquetValueReader` implementations for some basic
>>>>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>>>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>>>>> value vector reader there are column iterators that read from the parquet
>>>>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>>>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>>>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>>>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>>>>> work properly with the underlying interfaces.  I'v also made changes to
>>>>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>>>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>>>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>>>>> planning runtime works fine with these changes.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Although it fails during query execution, the bit it's  currently
>>>>>> failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>>>> [github.com]
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>>>>
>>>>>>
>>>>>>
>>>>>> This code, I think,  tries to apply the iterator's schema projection
>>>>>> on the InternalRow instances. This seems to be tightly coupled to
>>>>>> InternalRow as Spark's catalyst expressions have implemented the
>>>>>> UnsafeProjection for InternalRow only. If I take this out and just return
>>>>>> the `Iterator<ColumnarBatch>` iterator I built it returns empty result on
>>>>>> the client. I'm guessing this is coz Spark is unaware of the iterator's
>>>>>> schema? There's a Todo in the code that says "*remove the projection
>>>>>> by reporting the iterator's schema back to Spark*".  Is there a
>>>>>> simple way to communicate that to Spark for my new iterator? Any pointers
>>>>>> on how to get around this?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks and Regards,
>>>>>>
>>>>>> -Gautam.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>
>>>>>> Replies inline.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Thanks for responding Ryan,
>>>>>>
>>>>>>
>>>>>>
>>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>>
>>>>>>
>>>>>>
>>>>>> I'd like to start with testing Arrow out with readers for primitive
>>>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1]
>>>>>> currently doesn't have converters for map type. How can I default these
>>>>>> types to regular materialization whilst supporting Arrow based support for
>>>>>> primitives?
>>>>>>
>>>>>>
>>>>>>
>>>>>> We should look at what Spark does to handle maps.
>>>>>>
>>>>>>
>>>>>>
>>>>>> I think we should get the prototype working with test cases that
>>>>>> don't have maps, structs, or lists. Just getting primitives working is a
>>>>>> good start and just won't hit these problems.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Lemme know if this makes sense...
>>>>>>
>>>>>>
>>>>>>
>>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types
>>>>>> into ArrowColumnVectors of corresponding column types by iterating over
>>>>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Sounds good to me. I'm not sure about extending vs wrapping because
>>>>>> I'm not too familiar with the Arrow APIs.
>>>>>>
>>>>>>
>>>>>>
>>>>>> - Reader.newParquetIterable()  maps primitive column types to the
>>>>>> newly added ArrowParquetValueReader but for other types (nested types,
>>>>>> etc.) uses current *InternalRow* based ValueReaders
>>>>>>
>>>>>>
>>>>>>
>>>>>> Sounds good for primitives, but I would just leave the nested types
>>>>>> un-implemented for now.
>>>>>>
>>>>>>
>>>>>>
>>>>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>>>>> *SupportsScanColumnarBatch* mixin currently expects this ) ..
>>>>>> *although* *I'm a bit lost on how the stitching of columns happens
>>>>>> currently*? .. and how the ArrowColumnVectors could  be stitched
>>>>>> alongside regular columns that don't have arrow based support ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> I don't think that you can mix regular columns and Arrow columns. It
>>>>>> has to be all one or the other. That's why it's easier to start with
>>>>>> primitives, then add structs, then lists, and finally maps.
>>>>>>
>>>>>>
>>>>>>
>>>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>>>
>>>>>>
>>>>>>
>>>>>> We will probably need two paths. One for columnar batches and one for
>>>>>> row-based reads. That doesn't need to be done right away and what you
>>>>>> already have in your working copy makes sense as a start.
>>>>>>
>>>>>>
>>>>>>
>>>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Gautam.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>>> [github.com]
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Ryan Blue
>>>>>>
>>>>>> Software Engineer
>>>>>>
>>>>>> Netflix
>>>>>>
>>>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Gautam <ga...@gmail.com>.
This fails on master too btw. Just wondering if i'm doing something wrong
trying to run this.

On Fri, Jul 26, 2019 at 2:24 PM Gautam <ga...@gmail.com> wrote:

> I'v been trying to run the jmh benchmarks bundled within the project. I'v
> been running into issues with that .. have other hit this? Am I running
> these incorrectly?
>
>
> bash-3.2$ ./gradlew :iceberg-spark:jmh
> -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
> ..
> ...
> > Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
> error: plug-in not found: ErrorProne
>
> FAILURE: Build failed with an exception.
>
>
>
> Is there a config/plugin I need to add to build.gradle?
>
>
>
>
>
>
>
>
> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue <rb...@netflix.com> wrote:
>
>> Thanks Gautam!
>>
>> We'll start taking a look at your code. What do you think about creating
>> a branch in the Iceberg repository where we can work on improving it
>> together, before merging it into master?
>>
>> Also, you mentioned performance comparisons. Do you have any early
>> results to share?
>>
>> rb
>>
>> On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com> wrote:
>>
>>> Hello Folks,
>>>
>>> I have checked in a WIP branch [1] with a working version of Vectorized
>>> reads for Iceberg reader. Here's the diff  [2].
>>>
>>> *Implementation Notes:*
>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
>>> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
>>> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
>>> each iteration.
>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
>>> copied from [3] . Added by @Daniel Weeks <dw...@netflix.com> . Thanks
>>> for that!
>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
>>> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
>>> the code)
>>>  - `VectorizedSparkParquetReaders` contains the visitor implementations
>>> to map Parquet types to appropriate value readers. I implemented the struct
>>> visitor so that the root schema can be mapped properly. This has the added
>>> benefit of vectorization support for structs, so yay!
>>>  - For the initial version the value readers read an entire row group
>>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>>> Spark's ColumnVector implementation backed by Arrow. This is the first
>>> contact point between Spark and Arrow interfaces.
>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>> which maps Structs to Columnar Batches. This allows us to have nested
>>> structs where each level of nesting would be a nested columnar batch. Lemme
>>> know what you think of this approach.
>>>  - I'v added value readers for all supported primitive types listed in
>>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>>> `TestSparkParquetVectorizedReader`
>>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>>
>>> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
>>> it's not used. This was from my previous impl of Vectorization. I'v kept it
>>> around to compare performance.
>>>
>>> Lemme know what folks think of the approach. I'm getting this working
>>> for our scale test benchmark and will report back with numbers. Feel free
>>> to run your own benchmarks and share.
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>>
>>>
>>>
>>> [1] -
>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>> [2] -
>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>> [3] -
>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>
>>>
>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com> wrote:
>>>
>>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>>> primitive type support.
>>>>
>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com> wrote:
>>>>
>>>>> Would it be possible to put the work in progress code in open source?
>>>>>
>>>>>
>>>>>
>>>>> *From: *Gautam <ga...@gmail.com>
>>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>> *To: *Daniel Weeks <dw...@netflix.com>
>>>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>>>> dev@iceberg.apache.org>
>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>
>>>>>
>>>>>
>>>>> That would be great!
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com>
>>>>> wrote:
>>>>>
>>>>> Hey Gautam,
>>>>>
>>>>>
>>>>>
>>>>> We also have a couple people looking into vectorized reading (into
>>>>> Arrow memory).  I think it would be good for us to get together and see if
>>>>> we can collaborate on a common approach for this.
>>>>>
>>>>>
>>>>>
>>>>> I'll reach out directly and see if we can get together.
>>>>>
>>>>>
>>>>>
>>>>> -Dan
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Figured this out. I'm returning ColumnarBatch iterator directly
>>>>> without projection with schema set appropriately in `readSchema() `.. the
>>>>> empty result was due to valuesRead not being set correctly on FileIterator.
>>>>> Did that and things are working. Will circle back with numbers soon.
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hey Guys,
>>>>>
>>>>>            Sorry bout the delay on this. Just got back on getting a
>>>>> basic working implementation in Iceberg for Vectorization on primitive
>>>>> types.
>>>>>
>>>>>
>>>>>
>>>>> *Here's what I have so far :  *
>>>>>
>>>>>
>>>>>
>>>>> I have added `ParquetValueReader` implementations for some basic
>>>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>>>> value vector reader there are column iterators that read from the parquet
>>>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>>>> work properly with the underlying interfaces.  I'v also made changes to
>>>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>>>> planning runtime works fine with these changes.
>>>>>
>>>>>
>>>>>
>>>>> Although it fails during query execution, the bit it's  currently
>>>>> failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>>> [github.com]
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>>>
>>>>>
>>>>>
>>>>> This code, I think,  tries to apply the iterator's schema projection
>>>>> on the InternalRow instances. This seems to be tightly coupled to
>>>>> InternalRow as Spark's catalyst expressions have implemented the
>>>>> UnsafeProjection for InternalRow only. If I take this out and just return
>>>>> the `Iterator<ColumnarBatch>` iterator I built it returns empty result on
>>>>> the client. I'm guessing this is coz Spark is unaware of the iterator's
>>>>> schema? There's a Todo in the code that says "*remove the projection
>>>>> by reporting the iterator's schema back to Spark*".  Is there a
>>>>> simple way to communicate that to Spark for my new iterator? Any pointers
>>>>> on how to get around this?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Thanks and Regards,
>>>>>
>>>>> -Gautam.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>> Replies inline.
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Thanks for responding Ryan,
>>>>>
>>>>>
>>>>>
>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>
>>>>>
>>>>>
>>>>> I'd like to start with testing Arrow out with readers for primitive
>>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1]
>>>>> currently doesn't have converters for map type. How can I default these
>>>>> types to regular materialization whilst supporting Arrow based support for
>>>>> primitives?
>>>>>
>>>>>
>>>>>
>>>>> We should look at what Spark does to handle maps.
>>>>>
>>>>>
>>>>>
>>>>> I think we should get the prototype working with test cases that don't
>>>>> have maps, structs, or lists. Just getting primitives working is a good
>>>>> start and just won't hit these problems.
>>>>>
>>>>>
>>>>>
>>>>> Lemme know if this makes sense...
>>>>>
>>>>>
>>>>>
>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types
>>>>> into ArrowColumnVectors of corresponding column types by iterating over
>>>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>>>
>>>>>
>>>>>
>>>>> Sounds good to me. I'm not sure about extending vs wrapping because
>>>>> I'm not too familiar with the Arrow APIs.
>>>>>
>>>>>
>>>>>
>>>>> - Reader.newParquetIterable()  maps primitive column types to the
>>>>> newly added ArrowParquetValueReader but for other types (nested types,
>>>>> etc.) uses current *InternalRow* based ValueReaders
>>>>>
>>>>>
>>>>>
>>>>> Sounds good for primitives, but I would just leave the nested types
>>>>> un-implemented for now.
>>>>>
>>>>>
>>>>>
>>>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>>>> *SupportsScanColumnarBatch* mixin currently expects this ) ..
>>>>> *although* *I'm a bit lost on how the stitching of columns happens
>>>>> currently*? .. and how the ArrowColumnVectors could  be stitched
>>>>> alongside regular columns that don't have arrow based support ?
>>>>>
>>>>>
>>>>>
>>>>> I don't think that you can mix regular columns and Arrow columns. It
>>>>> has to be all one or the other. That's why it's easier to start with
>>>>> primitives, then add structs, then lists, and finally maps.
>>>>>
>>>>>
>>>>>
>>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>>
>>>>>
>>>>>
>>>>> We will probably need two paths. One for columnar batches and one for
>>>>> row-based reads. That doesn't need to be done right away and what you
>>>>> already have in your working copy makes sense as a start.
>>>>>
>>>>>
>>>>>
>>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>>
>>>>>
>>>>>
>>>>> -Gautam.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>> [github.com]
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Ryan Blue
>>>>>
>>>>> Software Engineer
>>>>>
>>>>> Netflix
>>>>>
>>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Gautam <ga...@gmail.com>.
I'v been trying to run the jmh benchmarks bundled within the project. I'v
been running into issues with that .. have other hit this? Am I running
these incorrectly?


bash-3.2$ ./gradlew :iceberg-spark:jmh
-PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
-PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
..
...
> Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
error: plug-in not found: ErrorProne

FAILURE: Build failed with an exception.



Is there a config/plugin I need to add to build.gradle?








On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue <rb...@netflix.com> wrote:

> Thanks Gautam!
>
> We'll start taking a look at your code. What do you think about creating a
> branch in the Iceberg repository where we can work on improving it
> together, before merging it into master?
>
> Also, you mentioned performance comparisons. Do you have any early results
> to share?
>
> rb
>
> On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com> wrote:
>
>> Hello Folks,
>>
>> I have checked in a WIP branch [1] with a working version of Vectorized
>> reads for Iceberg reader. Here's the diff  [2].
>>
>> *Implementation Notes:*
>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
>> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
>> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
>> each iteration.
>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
>> copied from [3] . Added by @Daniel Weeks <dw...@netflix.com> . Thanks
>> for that!
>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
>> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
>> the code)
>>  - `VectorizedSparkParquetReaders` contains the visitor implementations
>> to map Parquet types to appropriate value readers. I implemented the struct
>> visitor so that the root schema can be mapped properly. This has the added
>> benefit of vectorization support for structs, so yay!
>>  - For the initial version the value readers read an entire row group
>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>> Spark's ColumnVector implementation backed by Arrow. This is the first
>> contact point between Spark and Arrow interfaces.
>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>> which maps Structs to Columnar Batches. This allows us to have nested
>> structs where each level of nesting would be a nested columnar batch. Lemme
>> know what you think of this approach.
>>  - I'v added value readers for all supported primitive types listed in
>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>> `TestSparkParquetVectorizedReader`
>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>
>> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
>> it's not used. This was from my previous impl of Vectorization. I'v kept it
>> around to compare performance.
>>
>> Lemme know what folks think of the approach. I'm getting this working for
>> our scale test benchmark and will report back with numbers. Feel free to
>> run your own benchmarks and share.
>>
>> Cheers,
>> -Gautam.
>>
>>
>>
>>
>> [1] -
>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>> [2] -
>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>> [3] -
>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>
>>
>> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com> wrote:
>>
>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>> primitive type support.
>>>
>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com> wrote:
>>>
>>>> Would it be possible to put the work in progress code in open source?
>>>>
>>>>
>>>>
>>>> *From: *Gautam <ga...@gmail.com>
>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>> *To: *Daniel Weeks <dw...@netflix.com>
>>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>>> dev@iceberg.apache.org>
>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>
>>>>
>>>>
>>>> That would be great!
>>>>
>>>>
>>>>
>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com>
>>>> wrote:
>>>>
>>>> Hey Gautam,
>>>>
>>>>
>>>>
>>>> We also have a couple people looking into vectorized reading (into
>>>> Arrow memory).  I think it would be good for us to get together and see if
>>>> we can collaborate on a common approach for this.
>>>>
>>>>
>>>>
>>>> I'll reach out directly and see if we can get together.
>>>>
>>>>
>>>>
>>>> -Dan
>>>>
>>>>
>>>>
>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com>
>>>> wrote:
>>>>
>>>> Figured this out. I'm returning ColumnarBatch iterator directly without
>>>> projection with schema set appropriately in `readSchema() `.. the empty
>>>> result was due to valuesRead not being set correctly on FileIterator. Did
>>>> that and things are working. Will circle back with numbers soon.
>>>>
>>>>
>>>>
>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com> wrote:
>>>>
>>>> Hey Guys,
>>>>
>>>>            Sorry bout the delay on this. Just got back on getting a
>>>> basic working implementation in Iceberg for Vectorization on primitive
>>>> types.
>>>>
>>>>
>>>>
>>>> *Here's what I have so far :  *
>>>>
>>>>
>>>>
>>>> I have added `ParquetValueReader` implementations for some basic
>>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>>> value vector reader there are column iterators that read from the parquet
>>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>>> work properly with the underlying interfaces.  I'v also made changes to
>>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>>> planning runtime works fine with these changes.
>>>>
>>>>
>>>>
>>>> Although it fails during query execution, the bit it's  currently
>>>> failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>> [github.com]
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>>
>>>>
>>>>
>>>> This code, I think,  tries to apply the iterator's schema projection on
>>>> the InternalRow instances. This seems to be tightly coupled to InternalRow
>>>> as Spark's catalyst expressions have implemented the UnsafeProjection for
>>>> InternalRow only. If I take this out and just return the
>>>> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the
>>>> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
>>>> There's a Todo in the code that says "*remove the projection by
>>>> reporting the iterator's schema back to Spark*".  Is there a simple
>>>> way to communicate that to Spark for my new iterator? Any pointers on how
>>>> to get around this?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Thanks and Regards,
>>>>
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>> Replies inline.
>>>>
>>>>
>>>>
>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com> wrote:
>>>>
>>>> Thanks for responding Ryan,
>>>>
>>>>
>>>>
>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>
>>>>
>>>>
>>>> I'd like to start with testing Arrow out with readers for primitive
>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1]
>>>> currently doesn't have converters for map type. How can I default these
>>>> types to regular materialization whilst supporting Arrow based support for
>>>> primitives?
>>>>
>>>>
>>>>
>>>> We should look at what Spark does to handle maps.
>>>>
>>>>
>>>>
>>>> I think we should get the prototype working with test cases that don't
>>>> have maps, structs, or lists. Just getting primitives working is a good
>>>> start and just won't hit these problems.
>>>>
>>>>
>>>>
>>>> Lemme know if this makes sense...
>>>>
>>>>
>>>>
>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>>>> ArrowColumnVectors of corresponding column types by iterating over
>>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>>
>>>>
>>>>
>>>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>>>> not too familiar with the Arrow APIs.
>>>>
>>>>
>>>>
>>>> - Reader.newParquetIterable()  maps primitive column types to the newly
>>>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>>>> current *InternalRow* based ValueReaders
>>>>
>>>>
>>>>
>>>> Sounds good for primitives, but I would just leave the nested types
>>>> un-implemented for now.
>>>>
>>>>
>>>>
>>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>>> *SupportsScanColumnarBatch* mixin currently expects this ) ..
>>>> *although* *I'm a bit lost on how the stitching of columns happens
>>>> currently*? .. and how the ArrowColumnVectors could  be stitched
>>>> alongside regular columns that don't have arrow based support ?
>>>>
>>>>
>>>>
>>>> I don't think that you can mix regular columns and Arrow columns. It
>>>> has to be all one or the other. That's why it's easier to start with
>>>> primitives, then add structs, then lists, and finally maps.
>>>>
>>>>
>>>>
>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>
>>>>
>>>>
>>>> We will probably need two paths. One for columnar batches and one for
>>>> row-based reads. That doesn't need to be done right away and what you
>>>> already have in your working copy makes sense as a start.
>>>>
>>>>
>>>>
>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>
>>>>
>>>>
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>> [github.com]
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Ryan Blue
>>>>
>>>> Software Engineer
>>>>
>>>> Netflix
>>>>
>>>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Daniel Weeks <dw...@netflix.com.INVALID>.
Gautam,

I've created a branch off current master:
https://github.com/apache/incubator-iceberg/tree/vectorized-read

I've also created a milestone, so feel free to add issues and we can
associate them with the milestone:
https://github.com/apache/incubator-iceberg/milestone/2

-Dan

On Wed, Jul 24, 2019 at 4:21 PM Gautam <ga...@gmail.com> wrote:

> +1 on having a branch. Lemme know once you do i'l rebase and open a PR
> against it.
>
> Will get back to you on perf numbers soon.
>
> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue <rb...@netflix.com> wrote:
>
>> Thanks Gautam!
>>
>> We'll start taking a look at your code. What do you think about creating
>> a branch in the Iceberg repository where we can work on improving it
>> together, before merging it into master?
>>
>> Also, you mentioned performance comparisons. Do you have any early
>> results to share?
>>
>> rb
>>
>> On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com> wrote:
>>
>>> Hello Folks,
>>>
>>> I have checked in a WIP branch [1] with a working version of Vectorized
>>> reads for Iceberg reader. Here's the diff  [2].
>>>
>>> *Implementation Notes:*
>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
>>> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
>>> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
>>> each iteration.
>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
>>> copied from [3] . Added by @Daniel Weeks <dw...@netflix.com> . Thanks
>>> for that!
>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
>>> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
>>> the code)
>>>  - `VectorizedSparkParquetReaders` contains the visitor implementations
>>> to map Parquet types to appropriate value readers. I implemented the struct
>>> visitor so that the root schema can be mapped properly. This has the added
>>> benefit of vectorization support for structs, so yay!
>>>  - For the initial version the value readers read an entire row group
>>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>>> Spark's ColumnVector implementation backed by Arrow. This is the first
>>> contact point between Spark and Arrow interfaces.
>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>> which maps Structs to Columnar Batches. This allows us to have nested
>>> structs where each level of nesting would be a nested columnar batch. Lemme
>>> know what you think of this approach.
>>>  - I'v added value readers for all supported primitive types listed in
>>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>>> `TestSparkParquetVectorizedReader`
>>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>>
>>> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
>>> it's not used. This was from my previous impl of Vectorization. I'v kept it
>>> around to compare performance.
>>>
>>> Lemme know what folks think of the approach. I'm getting this working
>>> for our scale test benchmark and will report back with numbers. Feel free
>>> to run your own benchmarks and share.
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>>
>>>
>>>
>>> [1] -
>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>> [2] -
>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>> [3] -
>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>
>>>
>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com> wrote:
>>>
>>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>>> primitive type support.
>>>>
>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com> wrote:
>>>>
>>>>> Would it be possible to put the work in progress code in open source?
>>>>>
>>>>>
>>>>>
>>>>> *From: *Gautam <ga...@gmail.com>
>>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>> *To: *Daniel Weeks <dw...@netflix.com>
>>>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>>>> dev@iceberg.apache.org>
>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>
>>>>>
>>>>>
>>>>> That would be great!
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com>
>>>>> wrote:
>>>>>
>>>>> Hey Gautam,
>>>>>
>>>>>
>>>>>
>>>>> We also have a couple people looking into vectorized reading (into
>>>>> Arrow memory).  I think it would be good for us to get together and see if
>>>>> we can collaborate on a common approach for this.
>>>>>
>>>>>
>>>>>
>>>>> I'll reach out directly and see if we can get together.
>>>>>
>>>>>
>>>>>
>>>>> -Dan
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Figured this out. I'm returning ColumnarBatch iterator directly
>>>>> without projection with schema set appropriately in `readSchema() `.. the
>>>>> empty result was due to valuesRead not being set correctly on FileIterator.
>>>>> Did that and things are working. Will circle back with numbers soon.
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hey Guys,
>>>>>
>>>>>            Sorry bout the delay on this. Just got back on getting a
>>>>> basic working implementation in Iceberg for Vectorization on primitive
>>>>> types.
>>>>>
>>>>>
>>>>>
>>>>> *Here's what I have so far :  *
>>>>>
>>>>>
>>>>>
>>>>> I have added `ParquetValueReader` implementations for some basic
>>>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>>>> value vector reader there are column iterators that read from the parquet
>>>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>>>> work properly with the underlying interfaces.  I'v also made changes to
>>>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>>>> planning runtime works fine with these changes.
>>>>>
>>>>>
>>>>>
>>>>> Although it fails during query execution, the bit it's  currently
>>>>> failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>>> [github.com]
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>>>
>>>>>
>>>>>
>>>>> This code, I think,  tries to apply the iterator's schema projection
>>>>> on the InternalRow instances. This seems to be tightly coupled to
>>>>> InternalRow as Spark's catalyst expressions have implemented the
>>>>> UnsafeProjection for InternalRow only. If I take this out and just return
>>>>> the `Iterator<ColumnarBatch>` iterator I built it returns empty result on
>>>>> the client. I'm guessing this is coz Spark is unaware of the iterator's
>>>>> schema? There's a Todo in the code that says "*remove the projection
>>>>> by reporting the iterator's schema back to Spark*".  Is there a
>>>>> simple way to communicate that to Spark for my new iterator? Any pointers
>>>>> on how to get around this?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Thanks and Regards,
>>>>>
>>>>> -Gautam.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>> Replies inline.
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Thanks for responding Ryan,
>>>>>
>>>>>
>>>>>
>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>
>>>>>
>>>>>
>>>>> I'd like to start with testing Arrow out with readers for primitive
>>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1]
>>>>> currently doesn't have converters for map type. How can I default these
>>>>> types to regular materialization whilst supporting Arrow based support for
>>>>> primitives?
>>>>>
>>>>>
>>>>>
>>>>> We should look at what Spark does to handle maps.
>>>>>
>>>>>
>>>>>
>>>>> I think we should get the prototype working with test cases that don't
>>>>> have maps, structs, or lists. Just getting primitives working is a good
>>>>> start and just won't hit these problems.
>>>>>
>>>>>
>>>>>
>>>>> Lemme know if this makes sense...
>>>>>
>>>>>
>>>>>
>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types
>>>>> into ArrowColumnVectors of corresponding column types by iterating over
>>>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>>>
>>>>>
>>>>>
>>>>> Sounds good to me. I'm not sure about extending vs wrapping because
>>>>> I'm not too familiar with the Arrow APIs.
>>>>>
>>>>>
>>>>>
>>>>> - Reader.newParquetIterable()  maps primitive column types to the
>>>>> newly added ArrowParquetValueReader but for other types (nested types,
>>>>> etc.) uses current *InternalRow* based ValueReaders
>>>>>
>>>>>
>>>>>
>>>>> Sounds good for primitives, but I would just leave the nested types
>>>>> un-implemented for now.
>>>>>
>>>>>
>>>>>
>>>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>>>> *SupportsScanColumnarBatch* mixin currently expects this ) ..
>>>>> *although* *I'm a bit lost on how the stitching of columns happens
>>>>> currently*? .. and how the ArrowColumnVectors could  be stitched
>>>>> alongside regular columns that don't have arrow based support ?
>>>>>
>>>>>
>>>>>
>>>>> I don't think that you can mix regular columns and Arrow columns. It
>>>>> has to be all one or the other. That's why it's easier to start with
>>>>> primitives, then add structs, then lists, and finally maps.
>>>>>
>>>>>
>>>>>
>>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>>
>>>>>
>>>>>
>>>>> We will probably need two paths. One for columnar batches and one for
>>>>> row-based reads. That doesn't need to be done right away and what you
>>>>> already have in your working copy makes sense as a start.
>>>>>
>>>>>
>>>>>
>>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>>
>>>>>
>>>>>
>>>>> -Gautam.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>> [github.com]
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Ryan Blue
>>>>>
>>>>> Software Engineer
>>>>>
>>>>> Netflix
>>>>>
>>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Gautam <ga...@gmail.com>.
+1 on having a branch. Lemme know once you do i'l rebase and open a PR
against it.

Will get back to you on perf numbers soon.

On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue <rb...@netflix.com> wrote:

> Thanks Gautam!
>
> We'll start taking a look at your code. What do you think about creating a
> branch in the Iceberg repository where we can work on improving it
> together, before merging it into master?
>
> Also, you mentioned performance comparisons. Do you have any early results
> to share?
>
> rb
>
> On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com> wrote:
>
>> Hello Folks,
>>
>> I have checked in a WIP branch [1] with a working version of Vectorized
>> reads for Iceberg reader. Here's the diff  [2].
>>
>> *Implementation Notes:*
>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
>> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
>> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
>> each iteration.
>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
>> copied from [3] . Added by @Daniel Weeks <dw...@netflix.com> . Thanks
>> for that!
>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
>> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
>> the code)
>>  - `VectorizedSparkParquetReaders` contains the visitor implementations
>> to map Parquet types to appropriate value readers. I implemented the struct
>> visitor so that the root schema can be mapped properly. This has the added
>> benefit of vectorization support for structs, so yay!
>>  - For the initial version the value readers read an entire row group
>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>> Spark's ColumnVector implementation backed by Arrow. This is the first
>> contact point between Spark and Arrow interfaces.
>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>> which maps Structs to Columnar Batches. This allows us to have nested
>> structs where each level of nesting would be a nested columnar batch. Lemme
>> know what you think of this approach.
>>  - I'v added value readers for all supported primitive types listed in
>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>> `TestSparkParquetVectorizedReader`
>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>
>> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
>> it's not used. This was from my previous impl of Vectorization. I'v kept it
>> around to compare performance.
>>
>> Lemme know what folks think of the approach. I'm getting this working for
>> our scale test benchmark and will report back with numbers. Feel free to
>> run your own benchmarks and share.
>>
>> Cheers,
>> -Gautam.
>>
>>
>>
>>
>> [1] -
>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>> [2] -
>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>> [3] -
>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>
>>
>> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com> wrote:
>>
>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>> primitive type support.
>>>
>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com> wrote:
>>>
>>>> Would it be possible to put the work in progress code in open source?
>>>>
>>>>
>>>>
>>>> *From: *Gautam <ga...@gmail.com>
>>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>> *To: *Daniel Weeks <dw...@netflix.com>
>>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>>> dev@iceberg.apache.org>
>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>
>>>>
>>>>
>>>> That would be great!
>>>>
>>>>
>>>>
>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com>
>>>> wrote:
>>>>
>>>> Hey Gautam,
>>>>
>>>>
>>>>
>>>> We also have a couple people looking into vectorized reading (into
>>>> Arrow memory).  I think it would be good for us to get together and see if
>>>> we can collaborate on a common approach for this.
>>>>
>>>>
>>>>
>>>> I'll reach out directly and see if we can get together.
>>>>
>>>>
>>>>
>>>> -Dan
>>>>
>>>>
>>>>
>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com>
>>>> wrote:
>>>>
>>>> Figured this out. I'm returning ColumnarBatch iterator directly without
>>>> projection with schema set appropriately in `readSchema() `.. the empty
>>>> result was due to valuesRead not being set correctly on FileIterator. Did
>>>> that and things are working. Will circle back with numbers soon.
>>>>
>>>>
>>>>
>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com> wrote:
>>>>
>>>> Hey Guys,
>>>>
>>>>            Sorry bout the delay on this. Just got back on getting a
>>>> basic working implementation in Iceberg for Vectorization on primitive
>>>> types.
>>>>
>>>>
>>>>
>>>> *Here's what I have so far :  *
>>>>
>>>>
>>>>
>>>> I have added `ParquetValueReader` implementations for some basic
>>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>>> value vector reader there are column iterators that read from the parquet
>>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>>> work properly with the underlying interfaces.  I'v also made changes to
>>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>>> planning runtime works fine with these changes.
>>>>
>>>>
>>>>
>>>> Although it fails during query execution, the bit it's  currently
>>>> failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>> [github.com]
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>>
>>>>
>>>>
>>>> This code, I think,  tries to apply the iterator's schema projection on
>>>> the InternalRow instances. This seems to be tightly coupled to InternalRow
>>>> as Spark's catalyst expressions have implemented the UnsafeProjection for
>>>> InternalRow only. If I take this out and just return the
>>>> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the
>>>> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
>>>> There's a Todo in the code that says "*remove the projection by
>>>> reporting the iterator's schema back to Spark*".  Is there a simple
>>>> way to communicate that to Spark for my new iterator? Any pointers on how
>>>> to get around this?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Thanks and Regards,
>>>>
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>> Replies inline.
>>>>
>>>>
>>>>
>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com> wrote:
>>>>
>>>> Thanks for responding Ryan,
>>>>
>>>>
>>>>
>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>
>>>>
>>>>
>>>> I'd like to start with testing Arrow out with readers for primitive
>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1]
>>>> currently doesn't have converters for map type. How can I default these
>>>> types to regular materialization whilst supporting Arrow based support for
>>>> primitives?
>>>>
>>>>
>>>>
>>>> We should look at what Spark does to handle maps.
>>>>
>>>>
>>>>
>>>> I think we should get the prototype working with test cases that don't
>>>> have maps, structs, or lists. Just getting primitives working is a good
>>>> start and just won't hit these problems.
>>>>
>>>>
>>>>
>>>> Lemme know if this makes sense...
>>>>
>>>>
>>>>
>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>>>> ArrowColumnVectors of corresponding column types by iterating over
>>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>>
>>>>
>>>>
>>>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>>>> not too familiar with the Arrow APIs.
>>>>
>>>>
>>>>
>>>> - Reader.newParquetIterable()  maps primitive column types to the newly
>>>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>>>> current *InternalRow* based ValueReaders
>>>>
>>>>
>>>>
>>>> Sounds good for primitives, but I would just leave the nested types
>>>> un-implemented for now.
>>>>
>>>>
>>>>
>>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>>> *SupportsScanColumnarBatch* mixin currently expects this ) ..
>>>> *although* *I'm a bit lost on how the stitching of columns happens
>>>> currently*? .. and how the ArrowColumnVectors could  be stitched
>>>> alongside regular columns that don't have arrow based support ?
>>>>
>>>>
>>>>
>>>> I don't think that you can mix regular columns and Arrow columns. It
>>>> has to be all one or the other. That's why it's easier to start with
>>>> primitives, then add structs, then lists, and finally maps.
>>>>
>>>>
>>>>
>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>
>>>>
>>>>
>>>> We will probably need two paths. One for columnar batches and one for
>>>> row-based reads. That doesn't need to be done right away and what you
>>>> already have in your working copy makes sense as a start.
>>>>
>>>>
>>>>
>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>
>>>>
>>>>
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>> [github.com]
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Ryan Blue
>>>>
>>>> Software Engineer
>>>>
>>>> Netflix
>>>>
>>>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Thanks Gautam!

We'll start taking a look at your code. What do you think about creating a
branch in the Iceberg repository where we can work on improving it
together, before merging it into master?

Also, you mentioned performance comparisons. Do you have any early results
to share?

rb

On Tue, Jul 23, 2019 at 3:40 PM Gautam <ga...@gmail.com> wrote:

> Hello Folks,
>
> I have checked in a WIP branch [1] with a working version of Vectorized
> reads for Iceberg reader. Here's the diff  [2].
>
> *Implementation Notes:*
>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
> each iteration.
>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
> copied from [3] . Added by @Daniel Weeks <dw...@netflix.com> . Thanks
> for that!
>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
> the code)
>  - `VectorizedSparkParquetReaders` contains the visitor implementations to
> map Parquet types to appropriate value readers. I implemented the struct
> visitor so that the root schema can be mapped properly. This has the added
> benefit of vectorization support for structs, so yay!
>  - For the initial version the value readers read an entire row group into
> a single Arrow Field Vector. this i'd imagine will require tuning for right
> batch sizing but i'v gone with one batch per rowgroup for now.
>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
> Spark's ColumnVector implementation backed by Arrow. This is the first
> contact point between Spark and Arrow interfaces.
>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
> which maps Structs to Columnar Batches. This allows us to have nested
> structs where each level of nesting would be a nested columnar batch. Lemme
> know what you think of this approach.
>  - I'v added value readers for all supported primitive types listed in
> `AvroDataTest`. There's a corresponding test for vectorized reader under
> `TestSparkParquetVectorizedReader`
>  - I haven't fixed all the Checkstyle errors so you will have to turn
> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>
> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
> it's not used. This was from my previous impl of Vectorization. I'v kept it
> around to compare performance.
>
> Lemme know what folks think of the approach. I'm getting this working for
> our scale test benchmark and will report back with numbers. Feel free to
> run your own benchmarks and share.
>
> Cheers,
> -Gautam.
>
>
>
>
> [1] -
> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
> [2] -
> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
> [3] -
> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>
>
> On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com> wrote:
>
>> Will do. Doing a bit of housekeeping on the code and also adding more
>> primitive type support.
>>
>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com> wrote:
>>
>>> Would it be possible to put the work in progress code in open source?
>>>
>>>
>>>
>>> *From: *Gautam <ga...@gmail.com>
>>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>> *To: *Daniel Weeks <dw...@netflix.com>
>>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>>> dev@iceberg.apache.org>
>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>
>>>
>>>
>>> That would be great!
>>>
>>>
>>>
>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com> wrote:
>>>
>>> Hey Gautam,
>>>
>>>
>>>
>>> We also have a couple people looking into vectorized reading (into Arrow
>>> memory).  I think it would be good for us to get together and see if we can
>>> collaborate on a common approach for this.
>>>
>>>
>>>
>>> I'll reach out directly and see if we can get together.
>>>
>>>
>>>
>>> -Dan
>>>
>>>
>>>
>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com> wrote:
>>>
>>> Figured this out. I'm returning ColumnarBatch iterator directly without
>>> projection with schema set appropriately in `readSchema() `.. the empty
>>> result was due to valuesRead not being set correctly on FileIterator. Did
>>> that and things are working. Will circle back with numbers soon.
>>>
>>>
>>>
>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com> wrote:
>>>
>>> Hey Guys,
>>>
>>>            Sorry bout the delay on this. Just got back on getting a
>>> basic working implementation in Iceberg for Vectorization on primitive
>>> types.
>>>
>>>
>>>
>>> *Here's what I have so far :  *
>>>
>>>
>>>
>>> I have added `ParquetValueReader` implementations for some basic
>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>> value vector reader there are column iterators that read from the parquet
>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>> work properly with the underlying interfaces.  I'v also made changes to
>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>> planning runtime works fine with these changes.
>>>
>>>
>>>
>>> Although it fails during query execution, the bit it's  currently
>>> failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>> [github.com]
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>>
>>>
>>>
>>> This code, I think,  tries to apply the iterator's schema projection on
>>> the InternalRow instances. This seems to be tightly coupled to InternalRow
>>> as Spark's catalyst expressions have implemented the UnsafeProjection for
>>> InternalRow only. If I take this out and just return the
>>> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the
>>> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
>>> There's a Todo in the code that says "*remove the projection by
>>> reporting the iterator's schema back to Spark*".  Is there a simple way
>>> to communicate that to Spark for my new iterator? Any pointers on how to
>>> get around this?
>>>
>>>
>>>
>>>
>>>
>>> Thanks and Regards,
>>>
>>> -Gautam.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>>>
>>> Replies inline.
>>>
>>>
>>>
>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com> wrote:
>>>
>>> Thanks for responding Ryan,
>>>
>>>
>>>
>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>
>>>
>>>
>>> I'd like to start with testing Arrow out with readers for primitive type
>>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>>> currently doesn't have converters for map type. How can I default these
>>> types to regular materialization whilst supporting Arrow based support for
>>> primitives?
>>>
>>>
>>>
>>> We should look at what Spark does to handle maps.
>>>
>>>
>>>
>>> I think we should get the prototype working with test cases that don't
>>> have maps, structs, or lists. Just getting primitives working is a good
>>> start and just won't hit these problems.
>>>
>>>
>>>
>>> Lemme know if this makes sense...
>>>
>>>
>>>
>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>>> ArrowColumnVectors of corresponding column types by iterating over
>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>
>>>
>>>
>>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>>> not too familiar with the Arrow APIs.
>>>
>>>
>>>
>>> - Reader.newParquetIterable()  maps primitive column types to the newly
>>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>>> current *InternalRow* based ValueReaders
>>>
>>>
>>>
>>> Sounds good for primitives, but I would just leave the nested types
>>> un-implemented for now.
>>>
>>>
>>>
>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although*
>>>  *I'm a bit lost on how the stitching of columns happens currently*? ..
>>> and how the ArrowColumnVectors could  be stitched alongside regular columns
>>> that don't have arrow based support ?
>>>
>>>
>>>
>>> I don't think that you can mix regular columns and Arrow columns. It has
>>> to be all one or the other. That's why it's easier to start with
>>> primitives, then add structs, then lists, and finally maps.
>>>
>>>
>>>
>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>
>>>
>>>
>>> We will probably need two paths. One for columnar batches and one for
>>> row-based reads. That doesn't need to be done right away and what you
>>> already have in your working copy makes sense as a start.
>>>
>>>
>>>
>>> That's a lot of questions! :-) but hope i'm making sense.
>>>
>>>
>>>
>>> -Gautam.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>> [github.com]
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>>
>>>
>>>
>>> --
>>>
>>> Ryan Blue
>>>
>>> Software Engineer
>>>
>>> Netflix
>>>
>>>

-- 
Ryan Blue
Software Engineer
Netflix

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Gautam <ga...@gmail.com>.
Hello Folks,

I have checked in a WIP branch [1] with a working version of Vectorized
reads for Iceberg reader. Here's the diff  [2].

*Implementation Notes:*
 - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
each iteration.
 - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
copied from [3] . Added by @Daniel Weeks <dw...@netflix.com> . Thanks for
that!
 - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
reading/decoding the Parquet rowgroups (aka pagestores as referred to in
the code)
 - `VectorizedSparkParquetReaders` contains the visitor implementations to
map Parquet types to appropriate value readers. I implemented the struct
visitor so that the root schema can be mapped properly. This has the added
benefit of vectorization support for structs, so yay!
 - For the initial version the value readers read an entire row group into
a single Arrow Field Vector. this i'd imagine will require tuning for right
batch sizing but i'v gone with one batch per rowgroup for now.
 - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
Spark's ColumnVector implementation backed by Arrow. This is the first
contact point between Spark and Arrow interfaces.
 - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
`ColumnarBatchReader` . This is my replacement for `InternalRowReader`
which maps Structs to Columnar Batches. This allows us to have nested
structs where each level of nesting would be a nested columnar batch. Lemme
know what you think of this approach.
 - I'v added value readers for all supported primitive types listed in
`AvroDataTest`. There's a corresponding test for vectorized reader under
`TestSparkParquetVectorizedReader`
 - I haven't fixed all the Checkstyle errors so you will have to turn
checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(

*P.S*. There's some unused code under ArrowReader.java. Ignore this as it's
not used. This was from my previous impl of Vectorization. I'v kept it
around to compare performance.

Lemme know what folks think of the approach. I'm getting this working for
our scale test benchmark and will report back with numbers. Feel free to
run your own benchmarks and share.

Cheers,
-Gautam.




[1] -
https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
[2] -
https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
[3] -
https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java


On Mon, Jul 22, 2019 at 2:33 PM Gautam <ga...@gmail.com> wrote:

> Will do. Doing a bit of housekeeping on the code and also adding more
> primitive type support.
>
> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com> wrote:
>
>> Would it be possible to put the work in progress code in open source?
>>
>>
>>
>> *From: *Gautam <ga...@gmail.com>
>> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
>> *Date: *Monday, July 22, 2019 at 9:46 AM
>> *To: *Daniel Weeks <dw...@netflix.com>
>> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
>> dev@iceberg.apache.org>
>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>
>>
>>
>> That would be great!
>>
>>
>>
>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com> wrote:
>>
>> Hey Gautam,
>>
>>
>>
>> We also have a couple people looking into vectorized reading (into Arrow
>> memory).  I think it would be good for us to get together and see if we can
>> collaborate on a common approach for this.
>>
>>
>>
>> I'll reach out directly and see if we can get together.
>>
>>
>>
>> -Dan
>>
>>
>>
>> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com> wrote:
>>
>> Figured this out. I'm returning ColumnarBatch iterator directly without
>> projection with schema set appropriately in `readSchema() `.. the empty
>> result was due to valuesRead not being set correctly on FileIterator. Did
>> that and things are working. Will circle back with numbers soon.
>>
>>
>>
>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com> wrote:
>>
>> Hey Guys,
>>
>>            Sorry bout the delay on this. Just got back on getting a basic
>> working implementation in Iceberg for Vectorization on primitive types.
>>
>>
>>
>> *Here's what I have so far :  *
>>
>>
>>
>> I have added `ParquetValueReader` implementations for some basic
>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>> value vector reader there are column iterators that read from the parquet
>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>> work properly with the underlying interfaces.  I'v also made changes to
>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>> expects ColumnarBatch instances (instead of InternalRow). The query
>> planning runtime works fine with these changes.
>>
>>
>>
>> Although it fails during query execution, the bit it's  currently failing
>> at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>> [github.com]
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>>
>>
>>
>> This code, I think,  tries to apply the iterator's schema projection on
>> the InternalRow instances. This seems to be tightly coupled to InternalRow
>> as Spark's catalyst expressions have implemented the UnsafeProjection for
>> InternalRow only. If I take this out and just return the
>> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the
>> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
>> There's a Todo in the code that says "*remove the projection by
>> reporting the iterator's schema back to Spark*".  Is there a simple way
>> to communicate that to Spark for my new iterator? Any pointers on how to
>> get around this?
>>
>>
>>
>>
>>
>> Thanks and Regards,
>>
>> -Gautam.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>> Replies inline.
>>
>>
>>
>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com> wrote:
>>
>> Thanks for responding Ryan,
>>
>>
>>
>> Couple of follow up questions on ParquetValueReader for Arrow..
>>
>>
>>
>> I'd like to start with testing Arrow out with readers for primitive type
>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>> currently doesn't have converters for map type. How can I default these
>> types to regular materialization whilst supporting Arrow based support for
>> primitives?
>>
>>
>>
>> We should look at what Spark does to handle maps.
>>
>>
>>
>> I think we should get the prototype working with test cases that don't
>> have maps, structs, or lists. Just getting primitives working is a good
>> start and just won't hit these problems.
>>
>>
>>
>> Lemme know if this makes sense...
>>
>>
>>
>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>> ArrowColumnVectors of corresponding column types by iterating over
>> underlying ColumnIterator *n times*, where n is size of batch.
>>
>>
>>
>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>> not too familiar with the Arrow APIs.
>>
>>
>>
>> - Reader.newParquetIterable()  maps primitive column types to the newly
>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>> current *InternalRow* based ValueReaders
>>
>>
>>
>> Sounds good for primitives, but I would just leave the nested types
>> un-implemented for now.
>>
>>
>>
>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although* *I'm
>> a bit lost on how the stitching of columns happens currently*? .. and
>> how the ArrowColumnVectors could  be stitched alongside regular columns
>> that don't have arrow based support ?
>>
>>
>>
>> I don't think that you can mix regular columns and Arrow columns. It has
>> to be all one or the other. That's why it's easier to start with
>> primitives, then add structs, then lists, and finally maps.
>>
>>
>>
>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so that
>> DataSourceV2ScanExec starts using ColumnarBatch scans
>>
>>
>>
>> We will probably need two paths. One for columnar batches and one for
>> row-based reads. That doesn't need to be done right away and what you
>> already have in your working copy makes sense as a start.
>>
>>
>>
>> That's a lot of questions! :-) but hope i'm making sense.
>>
>>
>>
>> -Gautam.
>>
>>
>>
>>
>>
>>
>>
>> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>> [github.com]
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>>
>>
>>
>> --
>>
>> Ryan Blue
>>
>> Software Engineer
>>
>> Netflix
>>
>>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Gautam <ga...@gmail.com>.
Will do. Doing a bit of housekeeping on the code and also adding more
primitive type support.

On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah <mc...@palantir.com> wrote:

> Would it be possible to put the work in progress code in open source?
>
>
>
> *From: *Gautam <ga...@gmail.com>
> *Reply-To: *"dev@iceberg.apache.org" <de...@iceberg.apache.org>
> *Date: *Monday, July 22, 2019 at 9:46 AM
> *To: *Daniel Weeks <dw...@netflix.com>
> *Cc: *Ryan Blue <rb...@netflix.com>, Iceberg Dev List <
> dev@iceberg.apache.org>
> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>
>
>
> That would be great!
>
>
>
> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com> wrote:
>
> Hey Gautam,
>
>
>
> We also have a couple people looking into vectorized reading (into Arrow
> memory).  I think it would be good for us to get together and see if we can
> collaborate on a common approach for this.
>
>
>
> I'll reach out directly and see if we can get together.
>
>
>
> -Dan
>
>
>
> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com> wrote:
>
> Figured this out. I'm returning ColumnarBatch iterator directly without
> projection with schema set appropriately in `readSchema() `.. the empty
> result was due to valuesRead not being set correctly on FileIterator. Did
> that and things are working. Will circle back with numbers soon.
>
>
>
> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com> wrote:
>
> Hey Guys,
>
>            Sorry bout the delay on this. Just got back on getting a basic
> working implementation in Iceberg for Vectorization on primitive types.
>
>
>
> *Here's what I have so far :  *
>
>
>
> I have added `ParquetValueReader` implementations for some basic primitive
> types that build the respective Arrow Vector (`ValueVector`) viz.
> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
> value vector reader there are column iterators that read from the parquet
> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
> stitched together using a `ColumnarBatchReader` (which as the name suggests
> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
> work properly with the underlying interfaces.  I'v also made changes to
> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
> expects ColumnarBatch instances (instead of InternalRow). The query
> planning runtime works fine with these changes.
>
>
>
> Although it fails during query execution, the bit it's  currently failing
> at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
> [github.com]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>
>
>
> This code, I think,  tries to apply the iterator's schema projection on
> the InternalRow instances. This seems to be tightly coupled to InternalRow
> as Spark's catalyst expressions have implemented the UnsafeProjection for
> InternalRow only. If I take this out and just return the
> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the
> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
> There's a Todo in the code that says "*remove the projection by reporting
> the iterator's schema back to Spark*".  Is there a simple way to
> communicate that to Spark for my new iterator? Any pointers on how to get
> around this?
>
>
>
>
>
> Thanks and Regards,
>
> -Gautam.
>
>
>
>
>
>
>
>
>
> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>
> Replies inline.
>
>
>
> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com> wrote:
>
> Thanks for responding Ryan,
>
>
>
> Couple of follow up questions on ParquetValueReader for Arrow..
>
>
>
> I'd like to start with testing Arrow out with readers for primitive type
> and incrementally add in Struct/Array support, also ArrowWriter [1]
> currently doesn't have converters for map type. How can I default these
> types to regular materialization whilst supporting Arrow based support for
> primitives?
>
>
>
> We should look at what Spark does to handle maps.
>
>
>
> I think we should get the prototype working with test cases that don't
> have maps, structs, or lists. Just getting primitives working is a good
> start and just won't hit these problems.
>
>
>
> Lemme know if this makes sense...
>
>
>
> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
> ArrowColumnVectors of corresponding column types by iterating over
> underlying ColumnIterator *n times*, where n is size of batch.
>
>
>
> Sounds good to me. I'm not sure about extending vs wrapping because I'm
> not too familiar with the Arrow APIs.
>
>
>
> - Reader.newParquetIterable()  maps primitive column types to the newly
> added ArrowParquetValueReader but for other types (nested types, etc.) uses
> current *InternalRow* based ValueReaders
>
>
>
> Sounds good for primitives, but I would just leave the nested types
> un-implemented for now.
>
>
>
> - Stitch the columns vectors together to create ColumnarBatch, (Since
> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although* *I'm
> a bit lost on how the stitching of columns happens currently*? .. and how
> the ArrowColumnVectors could  be stitched alongside regular columns that
> don't have arrow based support ?
>
>
>
> I don't think that you can mix regular columns and Arrow columns. It has
> to be all one or the other. That's why it's easier to start with
> primitives, then add structs, then lists, and finally maps.
>
>
>
> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so that
> DataSourceV2ScanExec starts using ColumnarBatch scans
>
>
>
> We will probably need two paths. One for columnar batches and one for
> row-based reads. That doesn't need to be done right away and what you
> already have in your working copy makes sense as a start.
>
>
>
> That's a lot of questions! :-) but hope i'm making sense.
>
>
>
> -Gautam.
>
>
>
>
>
>
>
> [1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
> [github.com]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_master_sql_core_src_main_scala_org_apache_spark_sql_execution_arrow_ArrowWriter.scala&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=8yzJh2S49rbuM06dC5Sy-yMECClqEeLS7tpg45BmDN4&e=>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Matt Cheah <mc...@palantir.com>.
Would it be possible to put the work in progress code in open source?

 

From: Gautam <ga...@gmail.com>
Reply-To: "dev@iceberg.apache.org" <de...@iceberg.apache.org>
Date: Monday, July 22, 2019 at 9:46 AM
To: Daniel Weeks <dw...@netflix.com>
Cc: Ryan Blue <rb...@netflix.com>, Iceberg Dev List <de...@iceberg.apache.org>
Subject: Re: Approaching Vectorized Reading in Iceberg ..

 

That would be great!

 

On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com> wrote:

Hey Gautam, 

 

We also have a couple people looking into vectorized reading (into Arrow memory).  I think it would be good for us to get together and see if we can collaborate on a common approach for this.

 

I'll reach out directly and see if we can get together.

 

-Dan

 

On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com> wrote:

Figured this out. I'm returning ColumnarBatch iterator directly without projection with schema set appropriately in `readSchema() `.. the empty result was due to valuesRead not being set correctly on FileIterator. Did that and things are working. Will circle back with numbers soon. 

 

On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com> wrote:

Hey Guys, 

           Sorry bout the delay on this. Just got back on getting a basic working implementation in Iceberg for Vectorization on primitive types. 

 

Here's what I have so far :  

 

I have added `ParquetValueReader` implementations for some basic primitive types that build the respective Arrow Vector (`ValueVector`) viz. `IntVector` for int, `VarCharVector` for strings and so on. Underneath each value vector reader there are column iterators that read from the parquet pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and stitched together using a `ColumnarBatchReader` (which as the name suggests wraps ColumnarBatches in the iterator)   I'v verified that these pieces work properly with the underlying interfaces.  I'v also made changes to Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the `SupportsScanColumnarBatch` mixin to the reader).  So the reader now expects ColumnarBatch instances (instead of InternalRow). The query planning runtime works fine with these changes.

 

Although it fails during query execution, the bit it's  currently failing at is this line of code : https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414 [github.com]

 

This code, I think,  tries to apply the iterator's schema projection on the InternalRow instances. This seems to be tightly coupled to InternalRow as Spark's catalyst expressions have implemented the UnsafeProjection for InternalRow only. If I take this out and just return the `Iterator<ColumnarBatch>` iterator I built it returns empty result on the client. I'm guessing this is coz Spark is unaware of the iterator's schema? There's a Todo in the code that says "remove the projection by reporting the iterator's schema back to Spark".  Is there a simple way to communicate that to Spark for my new iterator? Any pointers on how to get around this?

 

 

Thanks and Regards,

-Gautam. 

 

 

 

 

On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:

Replies inline.

 

On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com> wrote:

Thanks for responding Ryan,  

 

Couple of follow up questions on ParquetValueReader for Arrow.. 

 

I'd like to start with testing Arrow out with readers for primitive type and incrementally add in Struct/Array support, also ArrowWriter [1] currently doesn't have converters for map type. How can I default these types to regular materialization whilst supporting Arrow based support for primitives? 

 

We should look at what Spark does to handle maps.

 

I think we should get the prototype working with test cases that don't have maps, structs, or lists. Just getting primitives working is a good start and just won't hit these problems.

 

Lemme know if this makes sense...

 

- I extend  PrimitiveReader (for Arrow) that loads primitive types into ArrowColumnVectors of corresponding column types by iterating over underlying ColumnIterator n times, where n is size of batch.

 

Sounds good to me. I'm not sure about extending vs wrapping because I'm not too familiar with the Arrow APIs.

 

- Reader.newParquetIterable()  maps primitive column types to the newly added ArrowParquetValueReader but for other types (nested types, etc.) uses current InternalRow based ValueReaders 

 

Sounds good for primitives, but I would just leave the nested types un-implemented for now.

 

- Stitch the columns vectors together to create ColumnarBatch, (Since SupportsScanColumnarBatch mixin currently expects this ) .. although I'm a bit lost on how the stitching of columns happens currently? .. and how the ArrowColumnVectors could  be stitched alongside regular columns that don't have arrow based support ?

 

I don't think that you can mix regular columns and Arrow columns. It has to be all one or the other. That's why it's easier to start with primitives, then add structs, then lists, and finally maps.

 

- Reader returns readTasks as  InputPartition<ColumnarBatch> so that DataSourceV2ScanExec starts using ColumnarBatch scans

 

We will probably need two paths. One for columnar batches and one for row-based reads. That doesn't need to be done right away and what you already have in your working copy makes sense as a start.

 

That's a lot of questions! :-) but hope i'm making sense.

 

-Gautam.

 

 

 

[1] - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala [github.com]

 

-- 

Ryan Blue 

Software Engineer

Netflix


Re: Approaching Vectorized Reading in Iceberg ..

Posted by Gautam <ga...@gmail.com>.
That would be great!

On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks <dw...@netflix.com> wrote:

> Hey Gautam,
>
> We also have a couple people looking into vectorized reading (into Arrow
> memory).  I think it would be good for us to get together and see if we can
> collaborate on a common approach for this.
>
> I'll reach out directly and see if we can get together.
>
> -Dan
>
> On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com> wrote:
>
>> Figured this out. I'm returning ColumnarBatch iterator directly without
>> projection with schema set appropriately in `readSchema() `.. the empty
>> result was due to valuesRead not being set correctly on FileIterator. Did
>> that and things are working. Will circle back with numbers soon.
>>
>> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com> wrote:
>>
>>> Hey Guys,
>>>            Sorry bout the delay on this. Just got back on getting a
>>> basic working implementation in Iceberg for Vectorization on primitive
>>> types.
>>>
>>> *Here's what I have so far :  *
>>>
>>> I have added `ParquetValueReader` implementations for some basic
>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>> value vector reader there are column iterators that read from the parquet
>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>> work properly with the underlying interfaces.  I'v also made changes to
>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>> planning runtime works fine with these changes.
>>>
>>> Although it fails during query execution, the bit it's  currently
>>> failing at is this line of code :
>>> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>
>>> This code, I think,  tries to apply the iterator's schema projection on
>>> the InternalRow instances. This seems to be tightly coupled to InternalRow
>>> as Spark's catalyst expressions have implemented the UnsafeProjection for
>>> InternalRow only. If I take this out and just return the
>>> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the
>>> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
>>> There's a Todo in the code that says "*remove the projection by
>>> reporting the iterator's schema back to Spark*".  Is there a simple way
>>> to communicate that to Spark for my new iterator? Any pointers on how to
>>> get around this?
>>>
>>>
>>> Thanks and Regards,
>>> -Gautam.
>>>
>>>
>>>
>>>
>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> Replies inline.
>>>>
>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com> wrote:
>>>>
>>>>> Thanks for responding Ryan,
>>>>>
>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>
>>>>> I'd like to start with testing Arrow out with readers for primitive
>>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1]
>>>>> currently doesn't have converters for map type. How can I default these
>>>>> types to regular materialization whilst supporting Arrow based support for
>>>>> primitives?
>>>>>
>>>>
>>>> We should look at what Spark does to handle maps.
>>>>
>>>> I think we should get the prototype working with test cases that don't
>>>> have maps, structs, or lists. Just getting primitives working is a good
>>>> start and just won't hit these problems.
>>>>
>>>>
>>>>> Lemme know if this makes sense...
>>>>>
>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types
>>>>> into ArrowColumnVectors of corresponding column types by iterating over
>>>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>>>
>>>>
>>>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>>>> not too familiar with the Arrow APIs.
>>>>
>>>>
>>>>> - Reader.newParquetIterable()  maps primitive column types to the
>>>>> newly added ArrowParquetValueReader but for other types (nested types,
>>>>> etc.) uses current *InternalRow* based ValueReaders
>>>>>
>>>>
>>>> Sounds good for primitives, but I would just leave the nested types
>>>> un-implemented for now.
>>>>
>>>>
>>>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>>>> *SupportsScanColumnarBatch* mixin currently expects this ) ..
>>>>> *although* *I'm a bit lost on how the stitching of columns happens
>>>>> currently*? .. and how the ArrowColumnVectors could  be stitched
>>>>> alongside regular columns that don't have arrow based support ?
>>>>>
>>>>
>>>> I don't think that you can mix regular columns and Arrow columns. It
>>>> has to be all one or the other. That's why it's easier to start with
>>>> primitives, then add structs, then lists, and finally maps.
>>>>
>>>>
>>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>>
>>>>
>>>> We will probably need two paths. One for columnar batches and one for
>>>> row-based reads. That doesn't need to be done right away and what you
>>>> already have in your working copy makes sense as a start.
>>>>
>>>>
>>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>>
>>>>> -Gautam.
>>>>>
>>>>>
>>>>>
>>>>> [1] -
>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Daniel Weeks <dw...@netflix.com.INVALID>.
Hey Gautam,

We also have a couple people looking into vectorized reading (into Arrow
memory).  I think it would be good for us to get together and see if we can
collaborate on a common approach for this.

I'll reach out directly and see if we can get together.

-Dan

On Sun, Jul 21, 2019 at 10:35 PM Gautam <ga...@gmail.com> wrote:

> Figured this out. I'm returning ColumnarBatch iterator directly without
> projection with schema set appropriately in `readSchema() `.. the empty
> result was due to valuesRead not being set correctly on FileIterator. Did
> that and things are working. Will circle back with numbers soon.
>
> On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com> wrote:
>
>> Hey Guys,
>>            Sorry bout the delay on this. Just got back on getting a basic
>> working implementation in Iceberg for Vectorization on primitive types.
>>
>> *Here's what I have so far :  *
>>
>> I have added `ParquetValueReader` implementations for some basic
>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>> value vector reader there are column iterators that read from the parquet
>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>> work properly with the underlying interfaces.  I'v also made changes to
>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>> expects ColumnarBatch instances (instead of InternalRow). The query
>> planning runtime works fine with these changes.
>>
>> Although it fails during query execution, the bit it's  currently failing
>> at is this line of code :
>> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>
>> This code, I think,  tries to apply the iterator's schema projection on
>> the InternalRow instances. This seems to be tightly coupled to InternalRow
>> as Spark's catalyst expressions have implemented the UnsafeProjection for
>> InternalRow only. If I take this out and just return the
>> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the
>> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
>> There's a Todo in the code that says "*remove the projection by
>> reporting the iterator's schema back to Spark*".  Is there a simple way
>> to communicate that to Spark for my new iterator? Any pointers on how to
>> get around this?
>>
>>
>> Thanks and Regards,
>> -Gautam.
>>
>>
>>
>>
>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Replies inline.
>>>
>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com> wrote:
>>>
>>>> Thanks for responding Ryan,
>>>>
>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>
>>>> I'd like to start with testing Arrow out with readers for primitive
>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1]
>>>> currently doesn't have converters for map type. How can I default these
>>>> types to regular materialization whilst supporting Arrow based support for
>>>> primitives?
>>>>
>>>
>>> We should look at what Spark does to handle maps.
>>>
>>> I think we should get the prototype working with test cases that don't
>>> have maps, structs, or lists. Just getting primitives working is a good
>>> start and just won't hit these problems.
>>>
>>>
>>>> Lemme know if this makes sense...
>>>>
>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>>>> ArrowColumnVectors of corresponding column types by iterating over
>>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>>
>>>
>>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>>> not too familiar with the Arrow APIs.
>>>
>>>
>>>> - Reader.newParquetIterable()  maps primitive column types to the newly
>>>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>>>> current *InternalRow* based ValueReaders
>>>>
>>>
>>> Sounds good for primitives, but I would just leave the nested types
>>> un-implemented for now.
>>>
>>>
>>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>>> *SupportsScanColumnarBatch* mixin currently expects this ) ..
>>>> *although* *I'm a bit lost on how the stitching of columns happens
>>>> currently*? .. and how the ArrowColumnVectors could  be stitched
>>>> alongside regular columns that don't have arrow based support ?
>>>>
>>>
>>> I don't think that you can mix regular columns and Arrow columns. It has
>>> to be all one or the other. That's why it's easier to start with
>>> primitives, then add structs, then lists, and finally maps.
>>>
>>>
>>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>>
>>>
>>> We will probably need two paths. One for columnar batches and one for
>>> row-based reads. That doesn't need to be done right away and what you
>>> already have in your working copy makes sense as a start.
>>>
>>>
>>>> That's a lot of questions! :-) but hope i'm making sense.
>>>>
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>> [1] -
>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>

Re: Approaching Vectorized Reading in Iceberg ..

Posted by Gautam <ga...@gmail.com>.
Figured this out. I'm returning ColumnarBatch iterator directly without
projection with schema set appropriately in `readSchema() `.. the empty
result was due to valuesRead not being set correctly on FileIterator. Did
that and things are working. Will circle back with numbers soon.

On Fri, Jul 19, 2019 at 5:22 PM Gautam <ga...@gmail.com> wrote:

> Hey Guys,
>            Sorry bout the delay on this. Just got back on getting a basic
> working implementation in Iceberg for Vectorization on primitive types.
>
> *Here's what I have so far :  *
>
> I have added `ParquetValueReader` implementations for some basic primitive
> types that build the respective Arrow Vector (`ValueVector`) viz.
> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
> value vector reader there are column iterators that read from the parquet
> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
> stitched together using a `ColumnarBatchReader` (which as the name suggests
> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
> work properly with the underlying interfaces.  I'v also made changes to
> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
> expects ColumnarBatch instances (instead of InternalRow). The query
> planning runtime works fine with these changes.
>
> Although it fails during query execution, the bit it's  currently failing
> at is this line of code :
> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>
> This code, I think,  tries to apply the iterator's schema projection on
> the InternalRow instances. This seems to be tightly coupled to InternalRow
> as Spark's catalyst expressions have implemented the UnsafeProjection for
> InternalRow only. If I take this out and just return the
> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the
> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
> There's a Todo in the code that says "*remove the projection by reporting
> the iterator's schema back to Spark*".  Is there a simple way to
> communicate that to Spark for my new iterator? Any pointers on how to get
> around this?
>
>
> Thanks and Regards,
> -Gautam.
>
>
>
>
> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>
>> Replies inline.
>>
>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <ga...@gmail.com> wrote:
>>
>>> Thanks for responding Ryan,
>>>
>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>
>>> I'd like to start with testing Arrow out with readers for primitive type
>>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>>> currently doesn't have converters for map type. How can I default these
>>> types to regular materialization whilst supporting Arrow based support for
>>> primitives?
>>>
>>
>> We should look at what Spark does to handle maps.
>>
>> I think we should get the prototype working with test cases that don't
>> have maps, structs, or lists. Just getting primitives working is a good
>> start and just won't hit these problems.
>>
>>
>>> Lemme know if this makes sense...
>>>
>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>>> ArrowColumnVectors of corresponding column types by iterating over
>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>
>>
>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>> not too familiar with the Arrow APIs.
>>
>>
>>> - Reader.newParquetIterable()  maps primitive column types to the newly
>>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>>> current *InternalRow* based ValueReaders
>>>
>>
>> Sounds good for primitives, but I would just leave the nested types
>> un-implemented for now.
>>
>>
>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although*
>>>  *I'm a bit lost on how the stitching of columns happens currently*? ..
>>> and how the ArrowColumnVectors could  be stitched alongside regular columns
>>> that don't have arrow based support ?
>>>
>>
>> I don't think that you can mix regular columns and Arrow columns. It has
>> to be all one or the other. That's why it's easier to start with
>> primitives, then add structs, then lists, and finally maps.
>>
>>
>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>
>>
>> We will probably need two paths. One for columnar batches and one for
>> row-based reads. That doesn't need to be done right away and what you
>> already have in your working copy makes sense as a start.
>>
>>
>>> That's a lot of questions! :-) but hope i'm making sense.
>>>
>>> -Gautam.
>>>
>>>
>>>
>>> [1] -
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>